This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bca2e1f  [GOBBLIN-767] Support different time units in 
TimeBasedWriterPartitioner
bca2e1f is described below

commit bca2e1f5166406ba938d62f4649878b6f6c0b9b6
Author: zhchen <[email protected]>
AuthorDate: Fri May 10 13:50:52 2019 -0700

    [GOBBLIN-767] Support different time units in TimeBasedWriterPartitioner
    
    Closes #2630 from zxcware/tp
---
 .../TimeBasedAvroWriterPartitioner.java            |  5 +++
 .../partitioner/TimeBasedWriterPartitioner.java    | 12 ++++++-
 .../TimeBasedAvroWriterPartitionerTest.java        | 41 ++++++++++++++--------
 3 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
index 54e5abc..824d107 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
@@ -23,6 +23,8 @@ import org.apache.avro.generic.GenericRecord;
 
 import com.google.common.base.Optional;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.AvroUtils;
@@ -40,6 +42,7 @@ import org.apache.gobblin.util.ForkOperatorUtils;
  *
  * If a record contains none of the specified fields, or if no field is 
specified, the current timestamp will be used.
  */
+@Slf4j
 public class TimeBasedAvroWriterPartitioner extends 
TimeBasedWriterPartitioner<GenericRecord> {
 
   public static final String WRITER_PARTITION_COLUMNS = 
ConfigurationKeys.WRITER_PREFIX + ".partition.columns";
@@ -57,6 +60,8 @@ public class TimeBasedAvroWriterPartitioner extends 
TimeBasedWriterPartitioner<G
 
   private static Optional<List<String>> getWriterPartitionColumns(State state, 
int numBranches, int branchId) {
     String propName = 
ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_COLUMNS, 
numBranches, branchId);
+    log.info("Partition columns for dataset {} are: {}", 
state.getProp(ConfigurationKeys.DATASET_URN_KEY),
+        state.getProp(propName));
     return state.contains(propName) ? 
Optional.of(state.getPropAsList(propName)) : Optional.<List<String>> absent();
   }
 
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
index cf35f1e..816a0ae 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.writer.partitioner;
 
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -60,6 +61,8 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
   public static final String WRITER_PARTITION_PATTERN = 
ConfigurationKeys.WRITER_PREFIX + ".partition.pattern";
   public static final String WRITER_PARTITION_TIMEZONE = 
ConfigurationKeys.WRITER_PREFIX + ".partition.timezone";
   public static final String DEFAULT_WRITER_PARTITION_TIMEZONE = 
ConfigurationKeys.PST_TIMEZONE_NAME;
+  public static final String WRITER_PARTITION_TIMEUNIT = 
ConfigurationKeys.WRITER_PREFIX + ".partition.timeUnit";
+  public static final String DEFAULT_WRITER_PARTITION_TIMEUNIT = 
TimeUnit.MILLISECONDS.name();
   public static final String WRITER_PARTITION_GRANULARITY = 
ConfigurationKeys.WRITER_PREFIX + ".partition.granularity";
   public static final DatePartitionType DEFAULT_WRITER_PARTITION_GRANULARITY = 
DatePartitionType.HOUR;
 
@@ -71,6 +74,7 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
   private final String writerPartitionSuffix;
   private final DatePartitionType granularity;
   private final DateTimeZone timeZone;
+  private final TimeUnit timeUnit;
   private final Optional<DateTimeFormatter> timestampToPathFormatter;
   private final Schema schema;
 
@@ -79,6 +83,7 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
     this.writerPartitionSuffix = getWriterPartitionSuffix(state, numBranches, 
branchId);
     this.granularity = getGranularity(state, numBranches, branchId);
     this.timeZone = getTimeZone(state, numBranches, branchId);
+    this.timeUnit = getTimeUnit(state, numBranches, branchId);
     this.timestampToPathFormatter = getTimestampToPathFormatter(state, 
numBranches, branchId);
     this.schema = getSchema();
   }
@@ -117,6 +122,11 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
     return DateTimeZone.forID(state.getProp(propName, 
DEFAULT_WRITER_PARTITION_TIMEZONE));
   }
 
+  private static TimeUnit getTimeUnit(State state, int numBranches, int 
branchId) {
+    String propName = 
ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_TIMEUNIT, 
numBranches, branchId);
+    return TimeUnit.valueOf(state.getProp(propName, 
DEFAULT_WRITER_PARTITION_TIMEUNIT).toUpperCase());
+  }
+
   private Schema getSchema() {
     if (this.timestampToPathFormatter.isPresent()) {
       return getDateTimeFormatBasedSchema();
@@ -132,7 +142,7 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
   @SuppressWarnings("fallthrough")
   @Override
   public GenericRecord partitionForRecord(D record) {
-    long timestamp = getRecordTimestamp(record);
+    long timestamp = timeUnit.toMillis(getRecordTimestamp(record));
     GenericRecord partition = new GenericData.Record(this.schema);
     if (!Strings.isNullOrEmpty(this.writerPartitionPrefix)) {
       partition.put(PREFIX, this.writerPartitionPrefix);
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
index e593aa1..bcc5905 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
@@ -63,7 +63,6 @@ public class TimeBasedAvroWriterPartitionerTest {
           + "\"name\" : \"" + PARTITION_COLUMN_NAME + "\"," + "\"type\" : 
\"long\"" + "} ]" + "}";
 
   private Schema schema;
-  private DataWriter<GenericRecord> writer;
 
   @BeforeClass
   public void setUp() throws IOException {
@@ -83,7 +82,9 @@ public class TimeBasedAvroWriterPartitionerTest {
     }
 
     this.schema = new Schema.Parser().parse(AVRO_SCHEMA);
+  }
 
+  private State getBasicState() {
     State properties = new State();
     
properties.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_COLUMNS, 
PARTITION_COLUMN_NAME);
     properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE, 
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
@@ -95,11 +96,16 @@ public class TimeBasedAvroWriterPartitionerTest {
     properties.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PATTERN, 
"yyyy/MM/dd");
     properties.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, 
TimeBasedAvroWriterPartitioner.class.getName());
 
+    return properties;
+  }
+
+  private DataWriter<GenericRecord> getWriter(State state)
+      throws IOException {
     // Build a writer to write test records
     DataWriterBuilder<Schema, GenericRecord> builder = new 
AvroDataWriterBuilder()
-        .writeTo(Destination.of(Destination.DestinationType.HDFS, 
properties)).writeInFormat(WriterOutputFormat.AVRO)
+        .writeTo(Destination.of(Destination.DestinationType.HDFS, 
state)).writeInFormat(WriterOutputFormat.AVRO)
         
.withWriterId(WRITER_ID).withSchema(this.schema).withBranches(1).forBranch(0);
-    this.writer = new PartitionedDataWriter<Schema, GenericRecord>(builder, 
properties);
+    return new PartitionedDataWriter<Schema, GenericRecord>(builder, state);
   }
 
   @Test
@@ -108,23 +114,31 @@ public class TimeBasedAvroWriterPartitionerTest {
     // Write three records, each should be written to a different file
     GenericRecordBuilder genericRecordBuilder = new 
GenericRecordBuilder(this.schema);
 
+    State state = getBasicState();
+    DataWriter<GenericRecord> millisPartitionWriter = getWriter(state);
+
     // This timestamp corresponds to 2015/01/01
     genericRecordBuilder.set("timestamp", 1420099200000l);
-    this.writer.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
+    millisPartitionWriter.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
 
     // This timestamp corresponds to 2015/01/02
     genericRecordBuilder.set("timestamp", 1420185600000l);
-    this.writer.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
+    millisPartitionWriter.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
 
-    // This timestamp corresponds to 2015/01/03
-    genericRecordBuilder.set("timestamp", 1420272000000l);
-    this.writer.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
-
-    // Check that the writer reports that 3 records have been written
-    Assert.assertEquals(this.writer.recordsWritten(), 3);
+    millisPartitionWriter.close();
+    millisPartitionWriter.commit();
+    // Check that the writer reports that 2 records have been written
+    Assert.assertEquals(millisPartitionWriter.recordsWritten(), 2);
 
-    this.writer.close();
-    this.writer.commit();
+    state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_TIMEUNIT, 
"seconds");
+    DataWriter<GenericRecord> secsPartitionWriter = getWriter(state);
+    // This timestamp corresponds to 2015/01/03
+    genericRecordBuilder.set("timestamp", 1420272000L);
+    secsPartitionWriter.writeEnvelope(new 
RecordEnvelope<>(genericRecordBuilder.build()));
+    secsPartitionWriter.close();
+    secsPartitionWriter.commit();
+    // Check that the writer reports that 1 record has been written
+    Assert.assertEquals(secsPartitionWriter.recordsWritten(), 1);
 
     // Check that 3 files were created
     Assert.assertEquals(FileUtils.listFiles(new File(TEST_ROOT_DIR), new 
String[] { "avro" }, true).size(), 3);
@@ -148,7 +162,6 @@ public class TimeBasedAvroWriterPartitionerTest {
 
   @AfterClass
   public void tearDown() throws IOException {
-    this.writer.close();
     FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
   }
 }

Reply via email to