This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b92f516 [GOBBLIN-915] Allow user customize the Extract timezone.
b92f516 is described below
commit b92f516a4e7a9b2c8bd2e90d71026fb07cb56da7
Author: Kuai Yu <[email protected]>
AuthorDate: Thu Oct 17 12:50:20 2019 -0700
[GOBBLIN-915] Allow user customize the Extract timezone.
Closes #2768 from yukuai518/tmzone
---
.../org/apache/gobblin/configuration/ConfigurationKeys.java | 3 +++
.../java/org/apache/gobblin/source/workunit/Extract.java | 7 ++-----
.../apache/gobblin/source/workunit/TimeZoneUtilsTest.java | 8 +++-----
.../gobblin/source/extractor/filebased/FileBasedSource.java | 13 +++++++++++--
4 files changed, 19 insertions(+), 12 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c6572a9..bdaa910 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -302,6 +302,9 @@ public class ConfigurationKeys {
public static final String EXTRACT_SCHEMA = "extract.schema";
public static final String EXTRACT_LIMIT_ENABLED_KEY =
"extract.limit.enabled";
public static final boolean DEFAULT_EXTRACT_LIMIT_ENABLED = false;
+ public static final String EXTRACT_ID_TIME_ZONE =
"extract.extractIdTimeZone";
+ public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
+
/**
* Converter configuration properties.
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
index a15a9ff..6a4e9ec 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
@@ -50,9 +50,6 @@ import org.apache.gobblin.configuration.WorkUnitState;
*/
public class Extract extends State {
- static final String EXTRACT_ID_TIME_ZONE = "extract.extractIdTimeZone";
- static final DateTimeZone DEFAULT_EXTRACT_ID_TIME_ZONE = DateTimeZone.UTC;
-
public enum TableType {
SNAPSHOT_ONLY,
SNAPSHOT_APPEND,
@@ -104,8 +101,8 @@ public class Extract extends State {
}
DateTimeZone getTimeZoneHelper(SourceState state) {
- return state.contains(EXTRACT_ID_TIME_ZONE) ?
DateTimeZone.forID(state.getProp(EXTRACT_ID_TIME_ZONE))
- : DEFAULT_EXTRACT_ID_TIME_ZONE;
+ return
DateTimeZone.forID(state.getProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
+ ConfigurationKeys.DEFAULT_EXTRACT_ID_TIME_ZONE));
}
/**
diff --git
a/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
b/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
index 92e6aae..9d83c61 100644
---
a/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
+++
b/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
@@ -20,25 +20,23 @@ package org.apache.gobblin.source.workunit;
import java.time.ZoneId;
import java.util.TimeZone;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.joda.time.DateTimeZone;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.apache.gobblin.source.workunit.Extract.EXTRACT_ID_TIME_ZONE;
-
-
public class TimeZoneUtilsTest {
@Test
public void testConfigurableTimeZone()
throws Exception {
SourceState state = new SourceState();
- state.setProp(EXTRACT_ID_TIME_ZONE, "America/Los_Angeles");
+ state.setProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
"America/Los_Angeles");
Extract extract = new Extract(state, Extract.TableType.APPEND_ONLY,
"random", "table");
Assert.assertEquals(extract.getTimeZoneHelper(state).toTimeZone(),
TimeZone.getTimeZone(ZoneId.of("America/Los_Angeles")));
- state.removeProp(EXTRACT_ID_TIME_ZONE);
+ state.removeProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE);
extract = new Extract(state, Extract.TableType.APPEND_ONLY, "random",
"table");
Assert.assertEquals(extract.getTimeZoneHelper(state), DateTimeZone.UTC);
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 14730e8..1e5bdc3 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -183,8 +183,17 @@ public abstract class FileBasedSource<S, D> extends
AbstractSource<S, D> {
// Distribute the files across the workunits
for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset +=
filesPerPartition) {
- // Use extract table name to create extract
- Extract extract = new Extract(tableType, nameSpaceName,
extractTableName);
+ /* Use extract table name to create extract
+ *
+ * We don't want to pass in the whole SourceState object just to avoid
any side effect, because
+ * the constructor with state argument has been deprecated for a long
time. Here we selectively
+ * chose the configuration needed for Extract constructor, to manually
form a source state.
+ */
+ SourceState extractState = new SourceState();
+ extractState.setProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
+ state.getProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
ConfigurationKeys.DEFAULT_EXTRACT_ID_TIME_ZONE));
+ Extract extract = new Extract(extractState, tableType, nameSpaceName,
extractTableName);
+
WorkUnit workUnit = WorkUnit.create(extract);
// Eventually these setters should be integrated with framework
support for generalized watermark handling