This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bca2e1f [GOBBLIN-767] Support different time units in
TimeBasedWriterPartitioner
bca2e1f is described below
commit bca2e1f5166406ba938d62f4649878b6f6c0b9b6
Author: zhchen <[email protected]>
AuthorDate: Fri May 10 13:50:52 2019 -0700
[GOBBLIN-767] Support different time units in TimeBasedWriterPartitioner
Closes #2630 from zxcware/tp
---
.../TimeBasedAvroWriterPartitioner.java | 5 +++
.../partitioner/TimeBasedWriterPartitioner.java | 12 ++++++-
.../TimeBasedAvroWriterPartitionerTest.java | 41 ++++++++++++++--------
3 files changed, 43 insertions(+), 15 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
index 54e5abc..824d107 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
@@ -23,6 +23,8 @@ import org.apache.avro.generic.GenericRecord;
import com.google.common.base.Optional;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.AvroUtils;
@@ -40,6 +42,7 @@ import org.apache.gobblin.util.ForkOperatorUtils;
*
* If a record contains none of the specified fields, or if no field is
specified, the current timestamp will be used.
*/
+@Slf4j
public class TimeBasedAvroWriterPartitioner extends
TimeBasedWriterPartitioner<GenericRecord> {
public static final String WRITER_PARTITION_COLUMNS =
ConfigurationKeys.WRITER_PREFIX + ".partition.columns";
@@ -57,6 +60,8 @@ public class TimeBasedAvroWriterPartitioner extends
TimeBasedWriterPartitioner<G
private static Optional<List<String>> getWriterPartitionColumns(State state,
int numBranches, int branchId) {
String propName =
ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_COLUMNS,
numBranches, branchId);
+ log.info("Partition columns for dataset {} are: {}",
state.getProp(ConfigurationKeys.DATASET_URN_KEY),
+ state.getProp(propName));
return state.contains(propName) ?
Optional.of(state.getPropAsList(propName)) : Optional.<List<String>> absent();
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
index cf35f1e..816a0ae 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.writer.partitioner;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -60,6 +61,8 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
public static final String WRITER_PARTITION_PATTERN =
ConfigurationKeys.WRITER_PREFIX + ".partition.pattern";
public static final String WRITER_PARTITION_TIMEZONE =
ConfigurationKeys.WRITER_PREFIX + ".partition.timezone";
public static final String DEFAULT_WRITER_PARTITION_TIMEZONE =
ConfigurationKeys.PST_TIMEZONE_NAME;
+ public static final String WRITER_PARTITION_TIMEUNIT =
ConfigurationKeys.WRITER_PREFIX + ".partition.timeUnit";
+ public static final String DEFAULT_WRITER_PARTITION_TIMEUNIT =
TimeUnit.MILLISECONDS.name();
public static final String WRITER_PARTITION_GRANULARITY =
ConfigurationKeys.WRITER_PREFIX + ".partition.granularity";
public static final DatePartitionType DEFAULT_WRITER_PARTITION_GRANULARITY =
DatePartitionType.HOUR;
@@ -71,6 +74,7 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
private final String writerPartitionSuffix;
private final DatePartitionType granularity;
private final DateTimeZone timeZone;
+ private final TimeUnit timeUnit;
private final Optional<DateTimeFormatter> timestampToPathFormatter;
private final Schema schema;
@@ -79,6 +83,7 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
this.writerPartitionSuffix = getWriterPartitionSuffix(state, numBranches,
branchId);
this.granularity = getGranularity(state, numBranches, branchId);
this.timeZone = getTimeZone(state, numBranches, branchId);
+ this.timeUnit = getTimeUnit(state, numBranches, branchId);
this.timestampToPathFormatter = getTimestampToPathFormatter(state,
numBranches, branchId);
this.schema = getSchema();
}
@@ -117,6 +122,11 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
return DateTimeZone.forID(state.getProp(propName,
DEFAULT_WRITER_PARTITION_TIMEZONE));
}
+ private static TimeUnit getTimeUnit(State state, int numBranches, int
branchId) {
+ String propName =
ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_TIMEUNIT,
numBranches, branchId);
+ return TimeUnit.valueOf(state.getProp(propName,
DEFAULT_WRITER_PARTITION_TIMEUNIT).toUpperCase());
+ }
+
private Schema getSchema() {
if (this.timestampToPathFormatter.isPresent()) {
return getDateTimeFormatBasedSchema();
@@ -132,7 +142,7 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
@SuppressWarnings("fallthrough")
@Override
public GenericRecord partitionForRecord(D record) {
- long timestamp = getRecordTimestamp(record);
+ long timestamp = timeUnit.toMillis(getRecordTimestamp(record));
GenericRecord partition = new GenericData.Record(this.schema);
if (!Strings.isNullOrEmpty(this.writerPartitionPrefix)) {
partition.put(PREFIX, this.writerPartitionPrefix);
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
index e593aa1..bcc5905 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
@@ -63,7 +63,6 @@ public class TimeBasedAvroWriterPartitionerTest {
+ "\"name\" : \"" + PARTITION_COLUMN_NAME + "\"," + "\"type\" :
\"long\"" + "} ]" + "}";
private Schema schema;
- private DataWriter<GenericRecord> writer;
@BeforeClass
public void setUp() throws IOException {
@@ -83,7 +82,9 @@ public class TimeBasedAvroWriterPartitionerTest {
}
this.schema = new Schema.Parser().parse(AVRO_SCHEMA);
+ }
+ private State getBasicState() {
State properties = new State();
properties.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_COLUMNS,
PARTITION_COLUMN_NAME);
properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE,
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
@@ -95,11 +96,16 @@ public class TimeBasedAvroWriterPartitionerTest {
properties.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PATTERN,
"yyyy/MM/dd");
properties.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS,
TimeBasedAvroWriterPartitioner.class.getName());
+ return properties;
+ }
+
+ private DataWriter<GenericRecord> getWriter(State state)
+ throws IOException {
// Build a writer to write test records
DataWriterBuilder<Schema, GenericRecord> builder = new
AvroDataWriterBuilder()
- .writeTo(Destination.of(Destination.DestinationType.HDFS,
properties)).writeInFormat(WriterOutputFormat.AVRO)
+ .writeTo(Destination.of(Destination.DestinationType.HDFS,
state)).writeInFormat(WriterOutputFormat.AVRO)
.withWriterId(WRITER_ID).withSchema(this.schema).withBranches(1).forBranch(0);
- this.writer = new PartitionedDataWriter<Schema, GenericRecord>(builder,
properties);
+ return new PartitionedDataWriter<Schema, GenericRecord>(builder, state);
}
@Test
@@ -108,23 +114,31 @@ public class TimeBasedAvroWriterPartitionerTest {
// Write three records, each should be written to a different file
GenericRecordBuilder genericRecordBuilder = new
GenericRecordBuilder(this.schema);
+ State state = getBasicState();
+ DataWriter<GenericRecord> millisPartitionWriter = getWriter(state);
+
// This timestamp corresponds to 2015/01/01
genericRecordBuilder.set("timestamp", 1420099200000l);
- this.writer.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
+ millisPartitionWriter.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
// This timestamp corresponds to 2015/01/02
genericRecordBuilder.set("timestamp", 1420185600000l);
- this.writer.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
+ millisPartitionWriter.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
- // This timestamp corresponds to 2015/01/03
- genericRecordBuilder.set("timestamp", 1420272000000l);
- this.writer.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
-
- // Check that the writer reports that 3 records have been written
- Assert.assertEquals(this.writer.recordsWritten(), 3);
+ millisPartitionWriter.close();
+ millisPartitionWriter.commit();
+ // Check that the writer reports that 2 records have been written
+ Assert.assertEquals(millisPartitionWriter.recordsWritten(), 2);
- this.writer.close();
- this.writer.commit();
+ state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_TIMEUNIT,
"seconds");
+ DataWriter<GenericRecord> secsPartitionWriter = getWriter(state);
+ // This timestamp corresponds to 2015/01/03
+ genericRecordBuilder.set("timestamp", 1420272000L);
+ secsPartitionWriter.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
+ secsPartitionWriter.close();
+ secsPartitionWriter.commit();
+ // Check that the writer reports that 1 record has been written
+ Assert.assertEquals(secsPartitionWriter.recordsWritten(), 1);
// Check that 3 files were created
Assert.assertEquals(FileUtils.listFiles(new File(TEST_ROOT_DIR), new
String[] { "avro" }, true).size(), 3);
@@ -148,7 +162,6 @@ public class TimeBasedAvroWriterPartitionerTest {
@AfterClass
public void tearDown() throws IOException {
- this.writer.close();
FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
}
}