This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 569b8f9  [pulsar-io-hdfs2] Add config to create subdirectory from 
current time (#7771)
569b8f9 is described below

commit 569b8f9fc31bb382a05b6139cd75ed67338028e7
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Aug 12 06:50:44 2020 +0800

    [pulsar-io-hdfs2] Add config to create subdirectory from current time 
(#7771)
    
    ### Motivation
    
    Adding a subdirectory associated with current time willmake it easier to 
process HDFS files in batch.
    
    For example, user can create multiple running sink instances with 
`yyyy-MM-dd-hh` pattern. Then stop all instances at next hour. Eventually, 
files of the subdirectory will contain all messages consumed during this hour.
    
    ### Modifications
    
    - Add a `subdirectoryPattern` field to `HdfsSinkConfig`
    - Update some simple tests for `HdfsSinkConfig`
    - Update the doc of HDFS2 sink
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (docs)
---
 .../pulsar/io/hdfs2/sink/HdfsAbstractSink.java     | 15 ++++++++++++-
 .../pulsar/io/hdfs2/sink/HdfsSinkConfig.java       | 26 ++++++++++++++++++----
 .../pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java  |  3 +++
 pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml |  3 ++-
 site2/docs/io-hdfs2-sink.md                        |  5 ++++-
 5 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
index dbc5881..1d2096d 100644
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
+++ 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.io.hdfs2.sink;
 
 import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,6 +42,7 @@ import org.apache.pulsar.io.hdfs2.HdfsResources;
  * A Simple abstract class for HDFS sink.
  * Users need to implement extractKeyValue function to use this sink.
  */
+@Slf4j
 public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector 
implements Sink<V> {
 
     protected HdfsSinkConfig hdfsSinkConfig;
@@ -46,6 +50,7 @@ public abstract class HdfsAbstractSink<K, V> extends 
AbstractHdfsConnector imple
     protected HdfsSyncThread<V> syncThread;
     private Path path;
     private FSDataOutputStream hdfsStream;
+    private DateTimeFormatter subdirectoryFormatter;
 
     public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
     protected abstract void createWriter() throws IOException;
@@ -56,6 +61,9 @@ public abstract class HdfsAbstractSink<K, V> extends 
AbstractHdfsConnector imple
        hdfsSinkConfig.validate();
        connectorConfig = hdfsSinkConfig;
        unackedRecords = new LinkedBlockingQueue<Record<V>> 
(hdfsSinkConfig.getMaxPendingRecords());
+       if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
+           subdirectoryFormatter = 
DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
+       }
        connectToHdfs();
        createWriter();
        launchSyncThread();
@@ -99,8 +107,13 @@ public abstract class HdfsAbstractSink<K, V> extends 
AbstractHdfsConnector imple
                 ext = getCompressionCodec().getDefaultExtension();
             }
 
-            path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+            String directory = hdfsSinkConfig.getDirectory();
+            if (subdirectoryFormatter != null) {
+                directory = FilenameUtils.concat(directory, 
LocalDateTime.now().format(subdirectoryFormatter));
+            }
+            path = new Path(FilenameUtils.concat(directory,
                     hdfsSinkConfig.getFilenamePrefix() + "-" + 
System.currentTimeMillis() + ext));
+            log.info("Create path: {}", path);
         }
         return path;
     }
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
index fa52b6a..2af24fc 100644
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
+++ 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Map;
 
 import lombok.Data;
@@ -73,6 +75,14 @@ public class HdfsSinkConfig extends AbstractHdfsConfig 
implements Serializable {
      */
     private int maxPendingRecords = Integer.MAX_VALUE;
 
+    /**
+     * A subdirectory associated with the created time of the sink.
+     * The pattern is the formatted pattern of {@link 
AbstractHdfsConfig#getDirectory()}'s subdirectory.
+     *
+     * @see java.time.format.DateTimeFormatter for pattern's syntax
+     */
+    private String subdirectoryPattern;
+
     public static HdfsSinkConfig load(String yamlFile) throws IOException {
        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
        return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
@@ -87,16 +97,24 @@ public class HdfsSinkConfig extends AbstractHdfsConfig 
implements Serializable {
     public void validate() {
         super.validate();
         if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
-            || StringUtils.isEmpty(filenamePrefix)) {
-           throw new IllegalArgumentException("Required property not set.");
+                || StringUtils.isEmpty(filenamePrefix)) {
+            throw new IllegalArgumentException("Required property not set.");
         }
 
         if (syncInterval < 0) {
-          throw new IllegalArgumentException("Sync Interval cannot be 
negative");
+            throw new IllegalArgumentException("Sync Interval cannot be 
negative");
         }
 
         if (maxPendingRecords < 1) {
-          throw new IllegalArgumentException("Max Pending Records must be a 
positive integer");
+            throw new IllegalArgumentException("Max Pending Records must be a 
positive integer");
+        }
+
+        if (subdirectoryPattern != null) {
+            try {
+                LocalDateTime.of(2020, 1, 1, 12, 
0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
+            } catch (Exception e) {
+                throw new IllegalArgumentException(subdirectoryPattern + " is 
not a valid pattern: " + e.getMessage());
+            }
         }
     }
 }
diff --git 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
index d4e1f03..aa76064 100644
--- 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
+++ 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
@@ -44,6 +44,7 @@ public class HdfsSinkConfigTests {
                assertEquals("/foo/bar", config.getDirectory());
                assertEquals("prefix", config.getFilenamePrefix());
                assertEquals(Compression.SNAPPY, config.getCompression());
+               assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
        }
        
        @Test
@@ -53,6 +54,7 @@ public class HdfsSinkConfigTests {
                map.put("directory", "/foo/bar");
                map.put("filenamePrefix", "prefix");
                map.put("compression", "SNAPPY");
+               map.put("subdirectoryPattern", "yy-MM-dd");
                
                HdfsSinkConfig config = HdfsSinkConfig.load(map);
                assertNotNull(config);
@@ -60,6 +62,7 @@ public class HdfsSinkConfigTests {
                assertEquals("/foo/bar", config.getDirectory());
                assertEquals("prefix", config.getFilenamePrefix());
                assertEquals(Compression.SNAPPY, config.getCompression());
+               assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
        }
        
        @Test
diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml 
b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
index 5a19ee0..47ab4f9 100644
--- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
+++ b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
@@ -21,5 +21,6 @@
 "hdfsConfigResources": "core-site.xml",
 "directory": "/foo/bar",
 "filenamePrefix": "prefix",
-"compression": "SNAPPY"
+"compression": "SNAPPY",
+"subdirectoryPattern": "yyyy-MM-dd"
 }
\ No newline at end of file
diff --git a/site2/docs/io-hdfs2-sink.md b/site2/docs/io-hdfs2-sink.md
index 56c4c7b..cbeb418 100644
--- a/site2/docs/io-hdfs2-sink.md
+++ b/site2/docs/io-hdfs2-sink.md
@@ -26,6 +26,7 @@ The configuration of the HDFS2 sink connector has the 
following properties.
 | `separator` | char|false |None |The character used to separate records in a 
text file. <br/><br/>If no value is provided, the contents from all records are 
concatenated together in one continuous byte array. |
 | `syncInterval` | long| false |0| The interval between calls to flush data to 
HDFS disk in milliseconds. |
 | `maxPendingRecords` |int| false|Integer.MAX_VALUE |  The maximum number of 
records that hold in memory before acking. <br/><br/>Setting this property to 1 
makes every record send to disk before the record is acked.<br/><br/>Setting 
this property to a higher value allows buffering records before flushing them 
to disk. 
+| `subdirectoryPattern` | String | false | None | A subdirectory associated 
with the created time of the sink.<br/>The pattern is the formatted pattern of 
`directory`'s subdirectory.<br/><br/>See 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html)
 for pattern's syntax. |
 
 ### Example
 
@@ -39,7 +40,8 @@ Before using the HDFS2 sink connector, you need to create a 
configuration file t
         "directory": "/foo/bar",
         "filenamePrefix": "prefix",
         "fileExtension": ".log",
-        "compression": "SNAPPY"
+        "compression": "SNAPPY",
+        "subdirectoryPattern": "yyyy-MM-dd"
     }
     ```
 
@@ -52,4 +54,5 @@ Before using the HDFS2 sink connector, you need to create a 
configuration file t
         filenamePrefix: "prefix"
         fileExtension: ".log"
         compression: "SNAPPY"
+        subdirectoryPattern: "yyyy-MM-dd"
     ```

Reply via email to