This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 39239fa  [GOBBLIN-769] Support string record timestamp in 
TimeBasedAvroWriterPartitioner
39239fa is described below

commit 39239fabeac25c5cd5e7fcd95f2e4ee9624be81d
Author: zhchen <[email protected]>
AuthorDate: Tue May 14 10:40:32 2019 -0700

    [GOBBLIN-769] Support string record timestamp in 
TimeBasedAvroWriterPartitioner
    
    Closes #2632 from zxcware/sp
---
 .../TimeBasedAvroWriterPartitioner.java            |  28 +++++-
 .../partitioner/TimeBasedWriterPartitioner.java    |   2 +-
 .../TimeBasedAvroWriterPartitionerTest.java        | 107 ++++++++++++++-------
 3 files changed, 97 insertions(+), 40 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
index 824d107..5ee1975 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.writer.partitioner;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.generic.GenericRecord;
 
@@ -46,8 +47,11 @@ import org.apache.gobblin.util.ForkOperatorUtils;
 public class TimeBasedAvroWriterPartitioner extends 
TimeBasedWriterPartitioner<GenericRecord> {
 
   public static final String WRITER_PARTITION_COLUMNS = 
ConfigurationKeys.WRITER_PREFIX + ".partition.columns";
+  public static final String WRITER_PARTITION_ENABLE_PARSE_AS_STRING =
+      ConfigurationKeys.WRITER_PREFIX + ".partition.enableParseAsString";
 
   private final Optional<List<String>> partitionColumns;
+  private final boolean enableParseAsString;
 
   public TimeBasedAvroWriterPartitioner(State state) {
     this(state, 1, 0);
@@ -56,6 +60,8 @@ public class TimeBasedAvroWriterPartitioner extends 
TimeBasedWriterPartitioner<G
   public TimeBasedAvroWriterPartitioner(State state, int numBranches, int 
branchId) {
     super(state, numBranches, branchId);
     this.partitionColumns = getWriterPartitionColumns(state, numBranches, 
branchId);
+    this.enableParseAsString = getEnableParseAsString(state, numBranches, 
branchId);
+    log.info("Enable parse as string: {}", this.enableParseAsString);
   }
 
   private static Optional<List<String>> getWriterPartitionColumns(State state, 
int numBranches, int branchId) {
@@ -65,6 +71,12 @@ public class TimeBasedAvroWriterPartitioner extends 
TimeBasedWriterPartitioner<G
     return state.contains(propName) ? 
Optional.of(state.getPropAsList(propName)) : Optional.<List<String>> absent();
   }
 
+  private static boolean getEnableParseAsString(State state, int numBranches, 
int branchId) {
+    String propName = 
ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_ENABLE_PARSE_AS_STRING,
+        numBranches, branchId);
+    return state.getPropAsBoolean(propName, false);
+  }
+
   @Override
   public long getRecordTimestamp(GenericRecord record) {
     return getRecordTimestamp(getWriterPartitionColumnValue(record));
@@ -73,9 +85,19 @@ public class TimeBasedAvroWriterPartitioner extends 
TimeBasedWriterPartitioner<G
   /**
    *  Check if the partition column value is present and is a Long object. 
Otherwise, use current system time.
    */
-  private static long getRecordTimestamp(Optional<Object> 
writerPartitionColumnValue) {
-    return writerPartitionColumnValue.orNull() instanceof Long ? (Long) 
writerPartitionColumnValue.get()
-        : System.currentTimeMillis();
+  private long getRecordTimestamp(Optional<Object> writerPartitionColumnValue) 
{
+
+    if (writerPartitionColumnValue.isPresent()) {
+      Object val = writerPartitionColumnValue.get();
+      if (val instanceof Long) {
+        return (Long) val;
+      } else if (enableParseAsString) {
+        return Long.parseLong(val.toString());
+      }
+    }
+
+    // Default to current time
+    return timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
   }
 
   /**
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
index 816a0ae..5ec2c10 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
@@ -74,7 +74,7 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
   private final String writerPartitionSuffix;
   private final DatePartitionType granularity;
   private final DateTimeZone timeZone;
-  private final TimeUnit timeUnit;
+  protected final TimeUnit timeUnit;
   private final Optional<DateTimeFormatter> timestampToPathFormatter;
   private final Schema schema;
 
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
index bcc5905..eb2ddc0 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 
@@ -58,12 +59,6 @@ public class TimeBasedAvroWriterPartitionerTest {
   private static final String PARTITION_COLUMN_NAME = "timestamp";
   private static final String WRITER_ID = "writer-1";
 
-  private static final String AVRO_SCHEMA =
-      "{" + "\"type\" : \"record\"," + "\"name\" : \"User\"," + "\"namespace\" 
: \"example.avro\"," + "\"fields\" : [ {"
-          + "\"name\" : \"" + PARTITION_COLUMN_NAME + "\"," + "\"type\" : 
\"long\"" + "} ]" + "}";
-
-  private Schema schema;
-
   @BeforeClass
   public void setUp() throws IOException {
     File stagingDir = new File(STAGING_DIR);
@@ -80,42 +75,22 @@ public class TimeBasedAvroWriterPartitionerTest {
     } else {
       FileUtils.deleteDirectory(outputDir);
     }
-
-    this.schema = new Schema.Parser().parse(AVRO_SCHEMA);
-  }
-
-  private State getBasicState() {
-    State properties = new State();
-    
properties.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_COLUMNS, 
PARTITION_COLUMN_NAME);
-    properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, 
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
-    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, STAGING_DIR);
-    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, OUTPUT_DIR);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, BASE_FILE_PATH);
-    properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, FILE_NAME);
-    properties.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PATTERN, 
"yyyy/MM/dd");
-    properties.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
TimeBasedAvroWriterPartitioner.class.getName());
-
-    return properties;
-  }
-
-  private DataWriter<GenericRecord> getWriter(State state)
-      throws IOException {
-    // Build a writer to write test records
-    DataWriterBuilder<Schema, GenericRecord> builder = new 
AvroDataWriterBuilder()
-        .writeTo(Destination.of(Destination.DestinationType.HDFS, 
state)).writeInFormat(WriterOutputFormat.AVRO)
-        
.withWriterId(WRITER_ID).withSchema(this.schema).withBranches(1).forBranch(0);
-    return new PartitionedDataWriter<Schema, GenericRecord>(builder, state);
   }
 
+  /**
+   * Test
+   *  1. Record timestamp of type long
+   *  2. Partition path of a given record
+   */
   @Test
   public void testWriter() throws IOException {
 
+    Schema schema = getRecordSchema("long");
+    State state = getBasicState();
     // Write three records, each should be written to a different file
-    GenericRecordBuilder genericRecordBuilder = new 
GenericRecordBuilder(this.schema);
+    GenericRecordBuilder genericRecordBuilder = new 
GenericRecordBuilder(schema);
 
-    State state = getBasicState();
-    DataWriter<GenericRecord> millisPartitionWriter = getWriter(state);
+    DataWriter<GenericRecord> millisPartitionWriter = getWriter(schema, state);
 
     // This timestamp corresponds to 2015/01/01
     genericRecordBuilder.set("timestamp", 1420099200000l);
@@ -131,7 +106,7 @@ public class TimeBasedAvroWriterPartitionerTest {
     Assert.assertEquals(millisPartitionWriter.recordsWritten(), 2);
 
     state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_TIMEUNIT, 
"seconds");
-    DataWriter<GenericRecord> secsPartitionWriter = getWriter(state);
+    DataWriter<GenericRecord> secsPartitionWriter = getWriter(schema, state);
     // This timestamp corresponds to 2015/01/03
     genericRecordBuilder.set("timestamp", 1420272000L);
     secsPartitionWriter.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
@@ -160,8 +135,68 @@ public class TimeBasedAvroWriterPartitionerTest {
     Assert.assertTrue(outputDir20150103.exists());
   }
 
+  @Test
+  public void testGetRecordTimestamp() {
+
+    // Test for string record timestamp in millis partition time unit
+    State state = getBasicState();
+    TimeBasedAvroWriterPartitioner partitioner = new 
TimeBasedAvroWriterPartitioner(state);
+    GenericRecordBuilder genericRecordBuilder = new 
GenericRecordBuilder(getRecordSchema("string"));
+    genericRecordBuilder.set("timestamp", "1557786583000");
+    // Test without parsing as string
+    
Assert.assertTrue(partitioner.getRecordTimestamp(genericRecordBuilder.build()) 
> 1557786583000L);
+
+    // Test with parsing as string
+    
state.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_ENABLE_PARSE_AS_STRING,
 true);
+    partitioner = new TimeBasedAvroWriterPartitioner(state);
+    
Assert.assertEquals(partitioner.getRecordTimestamp(genericRecordBuilder.build()),
 1557786583000L);
+    // Test for Utf8
+    genericRecordBuilder.set("timestamp", new Utf8("1557786583000"));
+    
Assert.assertEquals(partitioner.getRecordTimestamp(genericRecordBuilder.build()),
 1557786583000L);
+    // Test for null value
+    genericRecordBuilder.set("timestamp", null);
+    Assert.assertTrue(
+        partitioner.getRecordTimestamp(genericRecordBuilder.build()) <= 
System.currentTimeMillis());
+
+    // Test for string type in seconds partition time unit
+    state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_TIMEUNIT, 
"seconds");
+    partitioner = new TimeBasedAvroWriterPartitioner(state);
+    genericRecordBuilder.set("timestamp", "1557786583");
+    
Assert.assertEquals(partitioner.getRecordTimestamp(genericRecordBuilder.build()),
 1557786583L);
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
   }
+
+  private DataWriter<GenericRecord> getWriter(Schema schema, State state)
+      throws IOException {
+    // Build a writer to write test records
+    DataWriterBuilder<Schema, GenericRecord> builder = new 
AvroDataWriterBuilder()
+        .writeTo(Destination.of(Destination.DestinationType.HDFS, 
state)).writeInFormat(WriterOutputFormat.AVRO)
+        
.withWriterId(WRITER_ID).withSchema(schema).withBranches(1).forBranch(0);
+    return new PartitionedDataWriter<Schema, GenericRecord>(builder, state);
+  }
+
+  private State getBasicState() {
+    State properties = new State();
+
+    
properties.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_COLUMNS, 
PARTITION_COLUMN_NAME);
+    properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, 
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, STAGING_DIR);
+    properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, OUTPUT_DIR);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, BASE_FILE_PATH);
+    properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, FILE_NAME);
+    properties.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PATTERN, 
"yyyy/MM/dd");
+    properties.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
TimeBasedAvroWriterPartitioner.class.getName());
+
+    return properties;
+  }
+
+  private Schema getRecordSchema(String timestampType) {
+    return new Schema.Parser().parse("{" + "\"type\" : \"record\"," + 
"\"name\" : \"User\"," + "\"namespace\" : \"example.avro\"," + "\"fields\" : [ 
{"
+        + "\"name\" : \"" + PARTITION_COLUMN_NAME + "\"," + "\"type\" : 
[\"null\", \"" + timestampType + "\"]} ]" + "}");
+  }
 }

Reply via email to