This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b92f516  [GOBBLIN-915] Allow user customize the Extract timezone.
b92f516 is described below

commit b92f516a4e7a9b2c8bd2e90d71026fb07cb56da7
Author: Kuai Yu <[email protected]>
AuthorDate: Thu Oct 17 12:50:20 2019 -0700

    [GOBBLIN-915] Allow user customize the Extract timezone.
    
    Closes #2768 from yukuai518/tmzone
---
 .../org/apache/gobblin/configuration/ConfigurationKeys.java |  3 +++
 .../java/org/apache/gobblin/source/workunit/Extract.java    |  7 ++-----
 .../apache/gobblin/source/workunit/TimeZoneUtilsTest.java   |  8 +++-----
 .../gobblin/source/extractor/filebased/FileBasedSource.java | 13 +++++++++++--
 4 files changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index c6572a9..bdaa910 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -302,6 +302,9 @@ public class ConfigurationKeys {
   public static final String EXTRACT_SCHEMA = "extract.schema";
   public static final String EXTRACT_LIMIT_ENABLED_KEY = 
"extract.limit.enabled";
   public static final boolean DEFAULT_EXTRACT_LIMIT_ENABLED = false;
+  public static final String EXTRACT_ID_TIME_ZONE = 
"extract.extractIdTimeZone";
+  public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
+
 
   /**
    * Converter configuration properties.
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
index a15a9ff..6a4e9ec 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/Extract.java
@@ -50,9 +50,6 @@ import org.apache.gobblin.configuration.WorkUnitState;
  */
 public class Extract extends State {
 
-  static final String EXTRACT_ID_TIME_ZONE = "extract.extractIdTimeZone";
-  static final DateTimeZone DEFAULT_EXTRACT_ID_TIME_ZONE = DateTimeZone.UTC;
-
   public enum TableType {
     SNAPSHOT_ONLY,
     SNAPSHOT_APPEND,
@@ -104,8 +101,8 @@ public class Extract extends State {
   }
 
   DateTimeZone getTimeZoneHelper(SourceState state) {
-    return state.contains(EXTRACT_ID_TIME_ZONE) ? 
DateTimeZone.forID(state.getProp(EXTRACT_ID_TIME_ZONE))
-        : DEFAULT_EXTRACT_ID_TIME_ZONE;
+    return 
DateTimeZone.forID(state.getProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
+            ConfigurationKeys.DEFAULT_EXTRACT_ID_TIME_ZONE));
   }
 
   /**
diff --git 
a/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
 
b/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
index 92e6aae..9d83c61 100644
--- 
a/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
+++ 
b/gobblin-api/src/test/java/org/apache/gobblin/source/workunit/TimeZoneUtilsTest.java
@@ -20,25 +20,23 @@ package org.apache.gobblin.source.workunit;
 import java.time.ZoneId;
 import java.util.TimeZone;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.apache.gobblin.source.workunit.Extract.EXTRACT_ID_TIME_ZONE;
-
-
 public class TimeZoneUtilsTest {
   @Test
   public void testConfigurableTimeZone()
       throws Exception {
     SourceState state = new SourceState();
-    state.setProp(EXTRACT_ID_TIME_ZONE, "America/Los_Angeles");
+    state.setProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE, 
"America/Los_Angeles");
     Extract extract = new Extract(state, Extract.TableType.APPEND_ONLY, 
"random", "table");
     Assert.assertEquals(extract.getTimeZoneHelper(state).toTimeZone(),
         TimeZone.getTimeZone(ZoneId.of("America/Los_Angeles")));
 
-    state.removeProp(EXTRACT_ID_TIME_ZONE);
+    state.removeProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE);
     extract = new Extract(state, Extract.TableType.APPEND_ONLY, "random", 
"table");
     Assert.assertEquals(extract.getTimeZoneHelper(state), DateTimeZone.UTC);
   }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 14730e8..1e5bdc3 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -183,8 +183,17 @@ public abstract class FileBasedSource<S, D> extends 
AbstractSource<S, D> {
 
       // Distribute the files across the workunits
       for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset += 
filesPerPartition) {
-        // Use extract table name to create extract
-        Extract extract = new Extract(tableType, nameSpaceName, 
extractTableName);
+        /* Use extract table name to create extract
+         *
+         * We don't want to pass in the whole SourceState object just to avoid 
any side effect, because
+         * the constructor with state argument has been deprecated for a long 
time. Here we selectively
+         * chose the configuration needed for Extract constructor, to manually 
form a source state.
+         */
+        SourceState extractState = new SourceState();
+        extractState.setProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE,
+                state.getProp(ConfigurationKeys.EXTRACT_ID_TIME_ZONE, 
ConfigurationKeys.DEFAULT_EXTRACT_ID_TIME_ZONE));
+        Extract extract = new Extract(extractState, tableType, nameSpaceName, 
extractTableName);
+
         WorkUnit workUnit = WorkUnit.create(extract);
 
         // Eventually these setters should be integrated with framework 
support for generalized watermark handling

Reply via email to