Repository: flume
Updated Branches:
  refs/heads/flume-1.7 a6b55f183 -> b03ad7168


FLUME-2875. Allow RollingFileSink to specify a file prefix and a file extension.

(Ralph Goers via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b03ad716
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b03ad716
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b03ad716

Branch: refs/heads/flume-1.7
Commit: b03ad71685b1032083145ee0a48ac5e4de8570a3
Parents: a6b55f1
Author: Hari Shreedharan <[email protected]>
Authored: Mon Feb 8 17:40:35 2016 -0800
Committer: Hari Shreedharan <[email protected]>
Committed: Mon Feb 8 17:41:44 2016 -0800

----------------------------------------------------------------------
 .../formatter/output/DefaultPathManager.java    | 108 +++++++++++++++++++
 .../flume/formatter/output/PathManager.java     |  63 ++++-------
 .../formatter/output/PathManagerFactory.java    |  82 ++++++++++++++
 .../flume/formatter/output/PathManagerType.java |  43 ++++++++
 .../formatter/output/RollTimePathManager.java   |  66 ++++++++++++
 .../org/apache/flume/sink/RollingFileSink.java  |   8 +-
 .../apache/flume/sink/TestRollingFileSink.java  | 106 ++++++++++++++++++
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  53 ++++-----
 8 files changed, 460 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
new file mode 100644
index 0000000..176db7f
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/DefaultPathManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flume.Context;
+
+public class DefaultPathManager implements PathManager {
+
+  private long seriesTimestamp;
+  private File baseDirectory;
+  private AtomicInteger fileIndex;
+  private String filePrefix;
+  private String extension;
+
+  private static final String DEFAULT_FILE_PREFIX = "";
+  private static final String DEFAULT_FILE_EXTENSION = "";
+  private static final String FILE_EXTENSION = "extension";
+  private static final String FILE_PREFIX = "prefix";
+
+  protected File currentFile;
+
+  public DefaultPathManager(Context context) {
+    filePrefix = context.getString(FILE_PREFIX, DEFAULT_FILE_PREFIX);
+    extension = context.getString(FILE_EXTENSION, DEFAULT_FILE_EXTENSION);
+    seriesTimestamp = System.currentTimeMillis();
+    fileIndex = new AtomicInteger();
+  }
+
+  @Override
+  public File nextFile() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(filePrefix).append(seriesTimestamp).append("-");
+    sb.append(fileIndex.incrementAndGet());
+    if (extension.length() > 0) {
+        sb.append(".").append(extension);
+    }
+    currentFile = new File(baseDirectory, sb.toString());
+
+    return currentFile;
+  }
+
+  @Override
+  public File getCurrentFile() {
+    if (currentFile == null) {
+      return nextFile();
+    }
+
+    return currentFile;
+  }
+
+  @Override
+  public void rotate() {
+    currentFile = null;
+  }
+
+  @Override
+  public File getBaseDirectory() {
+    return baseDirectory;
+  }
+
+  @Override
+  public void setBaseDirectory(File baseDirectory) {
+    this.baseDirectory = baseDirectory;
+  }
+
+  public long getSeriesTimestamp() {
+    return seriesTimestamp;
+  }
+
+  public String getPrefix() {
+    return filePrefix;
+  }
+
+  public String getExtension() {
+    return extension;
+  }
+
+  public AtomicInteger getFileIndex() {
+    return fileIndex;
+  }
+
+  public static class Builder implements PathManager.Builder {
+    @Override
+    public PathManager build(Context context) {
+      return new DefaultPathManager(context);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
index 933cc94..5a3066a 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManager.java
@@ -16,58 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.flume.formatter.output;
 
 import java.io.File;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.flume.Context;
 
-public class PathManager {
+/**
+ * Creates the files used by the RollingFileSink.
+ */
+public interface PathManager {
+    /**
+     * {@link Context} prefix
+     */
+    public static String CTX_PREFIX = "pathManager.";
 
-  private long seriesTimestamp;
-  private File baseDirectory;
-  private AtomicInteger fileIndex;
+    File nextFile();
 
-  private File currentFile;
+    File getCurrentFile();
 
-  public PathManager() {
-    seriesTimestamp = System.currentTimeMillis();
-    fileIndex = new AtomicInteger();
-  }
+    void rotate();
 
-  public File nextFile() {
-    currentFile = new File(baseDirectory, seriesTimestamp + "-"
-        + fileIndex.incrementAndGet());
+    File getBaseDirectory();
 
-    return currentFile;
-  }
+    void setBaseDirectory(File baseDirectory);
 
-  public File getCurrentFile() {
-    if (currentFile == null) {
-      return nextFile();
+    /**
+     * Knows how to construct this path manager.<br/>
+     * <b>Note: Implementations MUST provide a public a no-arg constructor.</b>
+     */
+    public interface Builder {
+        public PathManager build(Context context);
     }
-
-    return currentFile;
-  }
-
-  public void rotate() {
-    currentFile = null;
-  }
-
-  public File getBaseDirectory() {
-    return baseDirectory;
-  }
-
-  public void setBaseDirectory(File baseDirectory) {
-    this.baseDirectory = baseDirectory;
-  }
-
-  public long getSeriesTimestamp() {
-    return seriesTimestamp;
-  }
-
-  public AtomicInteger getFileIndex() {
-    return fileIndex;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
new file mode 100644
index 0000000..4dbe083
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.formatter.output;
+
+import com.google.common.base.Preconditions;
+import java.util.Locale;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create PathManager instances.
+ */
+public class PathManagerFactory {
+    private static final Logger logger = 
LoggerFactory.getLogger(PathManagerFactory.class);
+
+    public static PathManager getInstance(String managerType, Context context) 
{
+
+        Preconditions.checkNotNull(managerType, "path manager type must not be 
null");
+
+        // try to find builder class in enum of known output serializers
+        PathManagerType type;
+        try {
+            type = 
PathManagerType.valueOf(managerType.toUpperCase(Locale.ENGLISH));
+        } catch (IllegalArgumentException e) {
+            logger.debug("Not in enum, loading builder class: {}", 
managerType);
+            type = PathManagerType.OTHER;
+        }
+        Class<? extends PathManager.Builder> builderClass = 
type.getBuilderClass();
+
+        // handle the case where they have specified their own builder in the 
config
+        if (builderClass == null) {
+            try {
+                Class c = Class.forName(managerType);
+                if (c != null && 
PathManager.Builder.class.isAssignableFrom(c)) {
+                    builderClass = (Class<? extends PathManager.Builder>) c;
+                } else {
+                    String errMessage = "Unable to instantiate Builder from " +
+                            managerType + ": does not appear to implement " +
+                            PathManager.Builder.class.getName();
+                    throw new FlumeException(errMessage);
+                }
+            } catch (ClassNotFoundException ex) {
+                logger.error("Class not found: " + managerType, ex);
+                throw new FlumeException(ex);
+            }
+        }
+
+        // build the builder
+        PathManager.Builder builder;
+        try {
+            builder = builderClass.newInstance();
+        } catch (InstantiationException ex) {
+            String errMessage = "Cannot instantiate builder: " + managerType;
+            logger.error(errMessage, ex);
+            throw new FlumeException(errMessage, ex);
+        } catch (IllegalAccessException ex) {
+            String errMessage = "Cannot instantiate builder: " + managerType;
+            logger.error(errMessage, ex);
+            throw new FlumeException(errMessage, ex);
+        }
+
+        return builder.build(context);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
new file mode 100644
index 0000000..4f1fa93
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/PathManagerType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.formatter.output;
+
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+
+/**
+ *
+ */
[email protected]
[email protected]
+public enum PathManagerType {
+    DEFAULT(DefaultPathManager.Builder.class),
+    ROLLTIME(RollTimePathManager.Builder.class),
+    OTHER(null);
+
+    private final Class<? extends PathManager.Builder> builderClass;
+
+    PathManagerType(Class<? extends PathManager.Builder> builderClass) {
+        this.builderClass = builderClass;
+    }
+
+    public Class<? extends PathManager.Builder> getBuilderClass() {
+        return builderClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
new file mode 100644
index 0000000..6883a9c
--- /dev/null
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/RollTimePathManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.formatter.output;
+
+import java.io.File;
+
+import org.apache.flume.Context;
+import org.joda.time.LocalDateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/**
+ *
+ */
+public class RollTimePathManager extends DefaultPathManager {
+
+    private final DateTimeFormatter formatter = 
DateTimeFormat.forPattern("yyyyMMddHHmmss");
+    private String lastRoll;
+
+    public RollTimePathManager(Context context) {
+        super(context);
+    }
+
+    @Override
+    public File nextFile() {
+        StringBuilder sb = new StringBuilder();
+        String date = formatter.print(LocalDateTime.now());
+        if (!date.equals(lastRoll)) {
+            getFileIndex().set(0);
+            lastRoll = date;
+        }
+        sb.append(getPrefix()).append(date).append("-");
+        sb.append(getFileIndex().incrementAndGet());
+        if (getExtension().length() > 0) {
+            sb.append(".").append(getExtension());
+        }
+        currentFile = new File(getBaseDirectory(), sb.toString());
+
+        return currentFile;
+    }
+
+    public static class Builder implements PathManager.Builder {
+        @Override
+        public PathManager build(Context context) {
+            return new RollTimePathManager(context);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index 9cb3370..b97d404 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -33,6 +33,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.PathManager;
+import org.apache.flume.formatter.output.PathManagerFactory;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,13 +67,13 @@ public class RollingFileSink extends AbstractSink 
implements Configurable {
   private volatile boolean shouldRotate;
 
   public RollingFileSink() {
-    pathController = new PathManager();
     shouldRotate = false;
   }
 
   @Override
   public void configure(Context context) {
 
+    String pathManagerType = context.getString("sink.pathManager", "DEFAULT");
     String directory = context.getString("sink.directory");
     String rollInterval = context.getString("sink.rollInterval");
 
@@ -81,6 +82,11 @@ public class RollingFileSink extends AbstractSink implements 
Configurable {
         new Context(context.getSubProperties("sink." +
             EventSerializer.CTX_PREFIX));
 
+    Context pathManagerContext =
+              new Context(context.getSubProperties("sink." +
+                      PathManager.CTX_PREFIX));
+    pathController = PathManagerFactory.getInstance(pathManagerType, 
pathManagerContext);
+
     Preconditions.checkArgument(directory != null, "Directory may not be 
null");
     Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
index 07fa644..bf4ed1f 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
@@ -172,4 +172,110 @@ public class TestRollingFileSink {
       reader.close();
     }
   }
+
+  @Test
+  public void testAppend3() throws InterruptedException, LifecycleException, 
EventDeliveryException, IOException {
+    File tmpDir = new File("target/tmpLog");
+    tmpDir.mkdirs();
+    cleanDirectory(tmpDir);
+    Context context = new Context();
+
+    context.put("sink.directory", "target/tmpLog");
+    context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1");
+    context.put("sink.pathManager.prefix", "test3-");
+    context.put("sink.pathManager.extension", "txt");
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new PseudoTxnMemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    for (int i = 0; i < 10; i++) {
+      Event event = new SimpleEvent();
+
+      event.setBody(("Test event " + i).getBytes());
+
+      channel.put(event);
+      sink.process();
+
+      Thread.sleep(500);
+    }
+
+    sink.stop();
+
+    for (String file : sink.getDirectory().list()) {
+      BufferedReader reader = new BufferedReader(new FileReader(new 
File(sink.getDirectory(), file)));
+
+      String lastLine = null;
+      String currentLine = null;
+
+      while ((currentLine = reader.readLine()) != null) {
+        lastLine = currentLine;
+        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
+      }
+
+      reader.close();
+    }
+  }
+
+  @Test
+  public void testRollTime() throws InterruptedException, LifecycleException, 
EventDeliveryException, IOException {
+    File tmpDir = new File("target/tempLog");
+    tmpDir.mkdirs();
+    cleanDirectory(tmpDir);
+    Context context = new Context();
+
+    context.put("sink.directory", "target/tempLog/");
+    context.put("sink.rollInterval", "1");
+    context.put("sink.batchSize", "1");
+    context.put("sink.pathManager", "rolltime");
+    context.put("sink.pathManager.prefix", "test4-");
+    context.put("sink.pathManager.extension", "txt");
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new PseudoTxnMemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    for (int i = 0; i < 10; i++) {
+      Event event = new SimpleEvent();
+
+      event.setBody(("Test event " + i).getBytes());
+
+      channel.put(event);
+      sink.process();
+
+      Thread.sleep(500);
+    }
+
+    sink.stop();
+
+    for (String file : sink.getDirectory().list()) {
+      BufferedReader reader = new BufferedReader(new FileReader(new 
File(sink.getDirectory(), file)));
+
+      String lastLine = null;
+      String currentLine = null;
+
+      while ((currentLine = reader.readLine()) != null) {
+        lastLine = currentLine;
+        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
+      }
+
+      reader.close();
+    }
+  }
+
+  private void cleanDirectory(File dir) {
+    File[] files = dir.listFiles();
+    for (File file : files) {
+      file.delete();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b03ad716/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0f8461d..423e0cf 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -28,22 +28,22 @@ Apache Flume is a distributed, reliable, and available 
system for efficiently
 collecting, aggregating and moving large amounts of log data from many
 different sources to a centralized data store.
 
-The use of Apache Flume is not only restricted to log data aggregation. 
+The use of Apache Flume is not only restricted to log data aggregation.
 Since data sources are customizable, Flume can be used to transport massive 
quantities
-of event data including but not limited to network traffic data, 
social-media-generated data, 
+of event data including but not limited to network traffic data, 
social-media-generated data,
 email messages and pretty much any data source possible.
 
 Apache Flume is a top level project at the Apache Software Foundation.
 
 There are currently two release code lines available, versions 0.9.x and 1.x.
 
-Documentation for the 0.9.x track is available at 
+Documentation for the 0.9.x track is available at
 `the Flume 0.9.x User Guide 
<http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
 
 This documentation applies to the 1.4.x track.
 
-New and existing users are encouraged to use the 1.x releases so as to 
-leverage the performance improvements and configuration flexibilities 
available 
+New and existing users are encouraged to use the 1.x releases so as to
+leverage the performance improvements and configuration flexibilities available
 in the latest architecture.
 
 
@@ -1153,7 +1153,7 @@ Twitter 1% firehose Source (experimental)
 
 Experimental source that connects via Streaming API to the 1% sample twitter
 firehose, continously downloads tweets, converts them to Avro format and
-sends Avro events to a downstream Flume sink. Requires the consumer and 
+sends Avro events to a downstream Flume sink. Requires the consumer and
 access tokens and secrets of a Twitter developer account.
 Required properties are in **bold**.
 
@@ -1165,7 +1165,7 @@ Property Name          Default      Description
 **consumerKey**        --           OAuth consumer key
 **consumerSecret**     --           OAuth consumer secret
 **accessToken**        --           OAuth access token
-**accessTokenSecret**  --           OAuth toekn secret 
+**accessTokenSecret**  --           OAuth toekn secret
 maxBatchSize           1000         Maximum number of twitter messages to put 
in a single batch
 maxBatchDurationMillis 1000         Maximum number of milliseconds to wait 
before closing a batch
 ====================== ===========  
===================================================
@@ -2119,16 +2119,19 @@ File Roll Sink
 Stores events on the local filesystem.
 Required properties are in **bold**.
 
-===================  =======  
======================================================================================================================
-Property Name        Default  Description
-===================  =======  
======================================================================================================================
-**channel**          --
-**type**             --       The component type name, needs to be 
``file_roll``.
-**sink.directory**   --       The directory where files will be stored
-sink.rollInterval    30       Roll the file every 30 seconds. Specifying 0 
will disable rolling and cause all events to be written to a single file.
-sink.serializer      TEXT     Other possible options include ``avro_event`` or 
the FQCN of an implementation of EventSerializer.Builder interface.
-batchSize            100
-===================  =======  
======================================================================================================================
+==========================  =======  
======================================================================================================================
+Property Name               Default  Description
+==========================  =======  
======================================================================================================================
+**channel**                 --
+**type**                    --       The component type name, needs to be 
``file_roll``.
+**sink.directory**          --       The directory where files will be stored
+sink.pathManager            DEFAULT  The PathManager implementation to use.
+sink.pathManager.extension  --       The file extension if the default 
PathManager is used.
+sink.pathManager.prefix     --       A character string to add to the 
beginning of the file name if the default PathManager is used
+sink.rollInterval           30       Roll the file every 30 seconds. 
Specifying 0 will disable rolling and cause all events to be written to a 
single file.
+sink.serializer             TEXT     Other possible options include 
``avro_event`` or the FQCN of an implementation of EventSerializer.Builder 
interface.
+batchSize                   100
+==========================  =======  
======================================================================================================================
 
 Example for agent named a1:
 
@@ -2284,19 +2287,19 @@ This sink extracts data from Flume events, transforms 
it, and loads it in near-r
 
 This sink is well suited for use cases that stream raw data into HDFS (via the 
HdfsSink) and simultaneously extract, transform and load the same data into 
Solr (via MorphlineSolrSink). In particular, this sink can process arbitrary 
heterogeneous raw data from disparate data sources and turn it into a data 
model that is useful to Search applications.
 
-The ETL functionality is customizable using a `morphline configuration file 
<http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that 
defines a chain of transformation commands that pipe event records from one 
command to another. 
+The ETL functionality is customizable using a `morphline configuration file 
<http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html>`_ that 
defines a chain of transformation commands that pipe event records from one 
command to another.
 
 Morphlines can be seen as an evolution of Unix pipelines where the data model 
is generalized to work with streams of generic records, including arbitrary 
binary payloads. A morphline command is a bit like a Flume Interceptor. 
Morphlines can be embedded into Hadoop components such as Flume.
 
 Commands to parse and transform a set of standard data formats such as log 
files, Avro, CSV, Text, HTML, XML, PDF, Word, Excel, etc. are provided out of 
the box, and additional custom commands and parsers for additional data formats 
can be added as morphline plugins. Any kind of data format can be indexed and 
any Solr documents for any kind of Solr schema can be generated, and any custom 
ETL logic can be registered and executed.
 
-Morphlines manipulate continuous streams of records. The data model can be 
described as follows: A record is a set of named fields where each field has an 
ordered list of one or more values. A value can be any Java Object. That is, a 
record is essentially a hash table where each hash table entry contains a 
String key and a list of Java Objects as values. (The implementation uses 
Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field 
can have multiple values and any two records need not use common field names. 
+Morphlines manipulate continuous streams of records. The data model can be 
described as follows: A record is a set of named fields where each field has an 
ordered list of one or more values. A value can be any Java Object. That is, a 
record is essentially a hash table where each hash table entry contains a 
String key and a list of Java Objects as values. (The implementation uses 
Guava's ``ArrayListMultimap``, which is a ``ListMultimap``). Note that a field 
can have multiple values and any two records need not use common field names.
 
 This sink fills the body of the Flume event into the ``_attachment_body`` 
field of the morphline record, as well as copies the headers of the Flume event 
into record fields of the same name. The commands can then act on this data.
 
 Routing to a SolrCloud cluster is supported to improve scalability. Indexing 
load can be spread across a large number of MorphlineSolrSinks for improved 
scalability. Indexing load can be replicated across multiple MorphlineSolrSinks 
for high availability, for example using Flume features such as Load balancing 
Sink Processor. MorphlineInterceptor can also help to implement dynamic routing 
to multiple Solr collections (e.g. for multi-tenancy).
 
-The morphline and solr jars required for your environment must be placed in 
the lib directory of the Apache Flume installation. 
+The morphline and solr jars required for your environment must be placed in 
the lib directory of the Apache Flume installation.
 
 The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
 
@@ -2334,11 +2337,11 @@ ElasticSearchSink
 ~~~~~~~~~~~~~~~~~
 
 This sink writes data to an elasticsearch cluster. By default, events will be 
written so that the `Kibana <http://kibana.org>`_ graphical interface
-can display them - just as if `logstash <https://logstash.net>`_ wrote them. 
+can display them - just as if `logstash <https://logstash.net>`_ wrote them.
 
-The elasticsearch and lucene-core jars required for your environment must be 
placed in the lib directory of the Apache Flume installation. 
+The elasticsearch and lucene-core jars required for your environment must be 
placed in the lib directory of the Apache Flume installation.
 Elasticsearch requires that the major version of the client JAR match that of 
the server and that both are running the same minor version
-of the JVM. SerializationExceptions will appear if this is incorrect. To 
+of the JVM. SerializationExceptions will appear if this is incorrect. To
 select the required version first determine the version of elasticsearch and 
the JVM version the target cluster is running. Then select an elasticsearch 
client
 library which matches the major version. A 0.19.x client can talk to a 0.19.x 
cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the
 elasticsearch version has been determined then read the pom.xml file to 
determine the correct lucene-core JAR version to use. The Flume agent
@@ -2588,7 +2591,7 @@ Example for agent named a1:
   a1.channels.c1.transactionCapacity = 10000
   a1.channels.c1.byteCapacityBufferPercentage = 20
   a1.channels.c1.byteCapacity = 800000
-  
+
 
 JDBC Channel
 ~~~~~~~~~~~~
@@ -2796,7 +2799,7 @@ The disk store is managed using an embedded File channel. 
When the in-memory que
 the file channel. This channel is ideal for flows that need high throughput of 
memory channel during normal operation, but at the
 same time need the larger capacity of the file channel for better tolerance of 
intermittent sink side outages or drop in drain rates.
 The throughput will reduce approximately to file channel speeds during such 
abnormal situations. In case of an agent crash or restart,
-only the events stored on disk are recovered when the agent comes online. 
**This channel is currently experimental and 
+only the events stored on disk are recovered when the agent comes online. 
**This channel is currently experimental and
 not recommended for use in production.**
 
 Required properties are in **bold**. Please refer to file channel for 
additional required properties.

Reply via email to