This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 39239fa [GOBBLIN-769] Support string record timestamp in
TimeBasedAvroWriterPartitioner
39239fa is described below
commit 39239fabeac25c5cd5e7fcd95f2e4ee9624be81d
Author: zhchen <[email protected]>
AuthorDate: Tue May 14 10:40:32 2019 -0700
[GOBBLIN-769] Support string record timestamp in
TimeBasedAvroWriterPartitioner
Closes #2632 from zxcware/sp
---
.../TimeBasedAvroWriterPartitioner.java | 28 +++++-
.../partitioner/TimeBasedWriterPartitioner.java | 2 +-
.../TimeBasedAvroWriterPartitionerTest.java | 107 ++++++++++++++-------
3 files changed, 97 insertions(+), 40 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
index 824d107..5ee1975 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitioner.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.writer.partitioner;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericRecord;
@@ -46,8 +47,11 @@ import org.apache.gobblin.util.ForkOperatorUtils;
public class TimeBasedAvroWriterPartitioner extends
TimeBasedWriterPartitioner<GenericRecord> {
public static final String WRITER_PARTITION_COLUMNS =
ConfigurationKeys.WRITER_PREFIX + ".partition.columns";
+ public static final String WRITER_PARTITION_ENABLE_PARSE_AS_STRING =
+ ConfigurationKeys.WRITER_PREFIX + ".partition.enableParseAsString";
private final Optional<List<String>> partitionColumns;
+ private final boolean enableParseAsString;
public TimeBasedAvroWriterPartitioner(State state) {
this(state, 1, 0);
@@ -56,6 +60,8 @@ public class TimeBasedAvroWriterPartitioner extends
TimeBasedWriterPartitioner<G
public TimeBasedAvroWriterPartitioner(State state, int numBranches, int
branchId) {
super(state, numBranches, branchId);
this.partitionColumns = getWriterPartitionColumns(state, numBranches,
branchId);
+ this.enableParseAsString = getEnableParseAsString(state, numBranches,
branchId);
+ log.info("Enable parse as string: {}", this.enableParseAsString);
}
private static Optional<List<String>> getWriterPartitionColumns(State state,
int numBranches, int branchId) {
@@ -65,6 +71,12 @@ public class TimeBasedAvroWriterPartitioner extends
TimeBasedWriterPartitioner<G
return state.contains(propName) ?
Optional.of(state.getPropAsList(propName)) : Optional.<List<String>> absent();
}
+ private static boolean getEnableParseAsString(State state, int numBranches,
int branchId) {
+ String propName =
ForkOperatorUtils.getPropertyNameForBranch(WRITER_PARTITION_ENABLE_PARSE_AS_STRING,
+ numBranches, branchId);
+ return state.getPropAsBoolean(propName, false);
+ }
+
@Override
public long getRecordTimestamp(GenericRecord record) {
return getRecordTimestamp(getWriterPartitionColumnValue(record));
@@ -73,9 +85,19 @@ public class TimeBasedAvroWriterPartitioner extends
TimeBasedWriterPartitioner<G
/**
* Check if the partition column value is present and is a Long object.
Otherwise, use current system time.
*/
- private static long getRecordTimestamp(Optional<Object>
writerPartitionColumnValue) {
- return writerPartitionColumnValue.orNull() instanceof Long ? (Long)
writerPartitionColumnValue.get()
- : System.currentTimeMillis();
+ private long getRecordTimestamp(Optional<Object> writerPartitionColumnValue)
{
+
+ if (writerPartitionColumnValue.isPresent()) {
+ Object val = writerPartitionColumnValue.get();
+ if (val instanceof Long) {
+ return (Long) val;
+ } else if (enableParseAsString) {
+ return Long.parseLong(val.toString());
+ }
+ }
+
+ // Default to current time
+ return timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
index 816a0ae..5ec2c10 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
@@ -74,7 +74,7 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
private final String writerPartitionSuffix;
private final DatePartitionType granularity;
private final DateTimeZone timeZone;
- private final TimeUnit timeUnit;
+ protected final TimeUnit timeUnit;
private final Optional<DateTimeFormatter> timestampToPathFormatter;
private final Schema schema;
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
index bcc5905..eb2ddc0 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/writer/partitioner/TimeBasedAvroWriterPartitionerTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.util.Utf8;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
@@ -58,12 +59,6 @@ public class TimeBasedAvroWriterPartitionerTest {
private static final String PARTITION_COLUMN_NAME = "timestamp";
private static final String WRITER_ID = "writer-1";
- private static final String AVRO_SCHEMA =
- "{" + "\"type\" : \"record\"," + "\"name\" : \"User\"," + "\"namespace\"
: \"example.avro\"," + "\"fields\" : [ {"
- + "\"name\" : \"" + PARTITION_COLUMN_NAME + "\"," + "\"type\" :
\"long\"" + "} ]" + "}";
-
- private Schema schema;
-
@BeforeClass
public void setUp() throws IOException {
File stagingDir = new File(STAGING_DIR);
@@ -80,42 +75,22 @@ public class TimeBasedAvroWriterPartitionerTest {
} else {
FileUtils.deleteDirectory(outputDir);
}
-
- this.schema = new Schema.Parser().parse(AVRO_SCHEMA);
- }
-
- private State getBasicState() {
- State properties = new State();
-
properties.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_COLUMNS,
PARTITION_COLUMN_NAME);
- properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE,
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
- properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
- properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, STAGING_DIR);
- properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, OUTPUT_DIR);
- properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, BASE_FILE_PATH);
- properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, FILE_NAME);
- properties.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PATTERN,
"yyyy/MM/dd");
- properties.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS,
TimeBasedAvroWriterPartitioner.class.getName());
-
- return properties;
- }
-
- private DataWriter<GenericRecord> getWriter(State state)
- throws IOException {
- // Build a writer to write test records
- DataWriterBuilder<Schema, GenericRecord> builder = new
AvroDataWriterBuilder()
- .writeTo(Destination.of(Destination.DestinationType.HDFS,
state)).writeInFormat(WriterOutputFormat.AVRO)
-
.withWriterId(WRITER_ID).withSchema(this.schema).withBranches(1).forBranch(0);
- return new PartitionedDataWriter<Schema, GenericRecord>(builder, state);
}
+ /**
+ * Test
+ * 1. Record timestamp of type long
+ * 2. Partition path of a given record
+ */
@Test
public void testWriter() throws IOException {
+ Schema schema = getRecordSchema("long");
+ State state = getBasicState();
// Write three records, each should be written to a different file
- GenericRecordBuilder genericRecordBuilder = new
GenericRecordBuilder(this.schema);
+ GenericRecordBuilder genericRecordBuilder = new
GenericRecordBuilder(schema);
- State state = getBasicState();
- DataWriter<GenericRecord> millisPartitionWriter = getWriter(state);
+ DataWriter<GenericRecord> millisPartitionWriter = getWriter(schema, state);
// This timestamp corresponds to 2015/01/01
genericRecordBuilder.set("timestamp", 1420099200000l);
@@ -131,7 +106,7 @@ public class TimeBasedAvroWriterPartitionerTest {
Assert.assertEquals(millisPartitionWriter.recordsWritten(), 2);
state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_TIMEUNIT,
"seconds");
- DataWriter<GenericRecord> secsPartitionWriter = getWriter(state);
+ DataWriter<GenericRecord> secsPartitionWriter = getWriter(schema, state);
// This timestamp corresponds to 2015/01/03
genericRecordBuilder.set("timestamp", 1420272000L);
secsPartitionWriter.writeEnvelope(new
RecordEnvelope<>(genericRecordBuilder.build()));
@@ -160,8 +135,68 @@ public class TimeBasedAvroWriterPartitionerTest {
Assert.assertTrue(outputDir20150103.exists());
}
+ @Test
+ public void testGetRecordTimestamp() {
+
+ // Test for string record timestamp in millis partition time unit
+ State state = getBasicState();
+ TimeBasedAvroWriterPartitioner partitioner = new
TimeBasedAvroWriterPartitioner(state);
+ GenericRecordBuilder genericRecordBuilder = new
GenericRecordBuilder(getRecordSchema("string"));
+ genericRecordBuilder.set("timestamp", "1557786583000");
+ // Test without parsing as string
+
Assert.assertTrue(partitioner.getRecordTimestamp(genericRecordBuilder.build())
> 1557786583000L);
+
+ // Test with parsing as string
+
state.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_ENABLE_PARSE_AS_STRING,
true);
+ partitioner = new TimeBasedAvroWriterPartitioner(state);
+
Assert.assertEquals(partitioner.getRecordTimestamp(genericRecordBuilder.build()),
1557786583000L);
+ // Test for Utf8
+ genericRecordBuilder.set("timestamp", new Utf8("1557786583000"));
+
Assert.assertEquals(partitioner.getRecordTimestamp(genericRecordBuilder.build()),
1557786583000L);
+ // Test for null value
+ genericRecordBuilder.set("timestamp", null);
+ Assert.assertTrue(
+ partitioner.getRecordTimestamp(genericRecordBuilder.build()) <=
System.currentTimeMillis());
+
+ // Test for string type in seconds partition time unit
+ state.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_TIMEUNIT,
"seconds");
+ partitioner = new TimeBasedAvroWriterPartitioner(state);
+ genericRecordBuilder.set("timestamp", "1557786583");
+
Assert.assertEquals(partitioner.getRecordTimestamp(genericRecordBuilder.build()),
1557786583L);
+ }
+
@AfterClass
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
}
+
+ private DataWriter<GenericRecord> getWriter(Schema schema, State state)
+ throws IOException {
+ // Build a writer to write test records
+ DataWriterBuilder<Schema, GenericRecord> builder = new
AvroDataWriterBuilder()
+ .writeTo(Destination.of(Destination.DestinationType.HDFS,
state)).writeInFormat(WriterOutputFormat.AVRO)
+
.withWriterId(WRITER_ID).withSchema(schema).withBranches(1).forBranch(0);
+ return new PartitionedDataWriter<Schema, GenericRecord>(builder, state);
+ }
+
+ private State getBasicState() {
+ State properties = new State();
+
+
properties.setProp(TimeBasedAvroWriterPartitioner.WRITER_PARTITION_COLUMNS,
PARTITION_COLUMN_NAME);
+ properties.setProp(ConfigurationKeys.WRITER_BUFFER_SIZE,
ConfigurationKeys.DEFAULT_BUFFER_SIZE);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ properties.setProp(ConfigurationKeys.WRITER_STAGING_DIR, STAGING_DIR);
+ properties.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, OUTPUT_DIR);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_PATH, BASE_FILE_PATH);
+ properties.setProp(ConfigurationKeys.WRITER_FILE_NAME, FILE_NAME);
+ properties.setProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PATTERN,
"yyyy/MM/dd");
+ properties.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS,
TimeBasedAvroWriterPartitioner.class.getName());
+
+ return properties;
+ }
+
+ private Schema getRecordSchema(String timestampType) {
+ return new Schema.Parser().parse("{" + "\"type\" : \"record\"," +
"\"name\" : \"User\"," + "\"namespace\" : \"example.avro\"," + "\"fields\" : [
{"
+ + "\"name\" : \"" + PARTITION_COLUMN_NAME + "\"," + "\"type\" :
[\"null\", \"" + timestampType + "\"]} ]" + "}");
+ }
}