This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a88add0  [GOBBLIN-1096] Work with DST change in compaction watermark
a88add0 is described below

commit a88add02b67aad6949a6b1d5b1a92bfb870b8c56
Author: zhchen <zhc...@linkedin.com>
AuthorDate: Wed Mar 25 13:38:20 2020 -0700

    [GOBBLIN-1096] Work with DST change in compaction watermark
    
    Closes #2937 from zxcware/dst
---
 .../action/CompactionWatermarkAction.java          |  7 +++-
 .../action/CompactionWatermarkActionTest.java      | 48 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
index 2c6e1c2..cd3d98d 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -28,6 +28,7 @@ import com.google.common.base.Optional;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
 import org.apache.gobblin.configuration.State;
@@ -56,11 +57,13 @@ public class CompactionWatermarkAction implements 
CompactionCompleteAction<FileS
   private State state;
   private final String defaultHiveDb;
   private final TimeIterator.Granularity granularity;
+  private final ZoneId zone;
 
   public CompactionWatermarkAction(State state) {
     this.state = state;
     defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
     granularity = 
TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase());
+    zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE, 
MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
   }
 
   @Override
@@ -138,8 +141,8 @@ public class CompactionWatermarkAction implements 
CompactionCompleteAction<FileS
    * unit of {@link #granularity}
    */
   private long getExpectedNextWatermark(Long previousWatermark) {
-    ZonedDateTime previousWatermarkTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark),
-        ZoneId.systemDefault());
+    ZonedDateTime previousWatermarkTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark), zone);
+    // Since version 1.8, java time supports DST change in 
PST(America/Los_Angeles) time zone
     ZonedDateTime nextWatermarkTime = TimeIterator.inc(previousWatermarkTime, 
granularity, 1);
     return nextWatermarkTime.toInstant().toEpochMilli();
   }
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
index 038268c..16f33d6 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
@@ -101,6 +101,54 @@ public class CompactionWatermarkActionTest {
     doWatermarkTest(action, fsDataset, state, actualWatermark, 
actualWatermark);
   }
 
+  @Test
+  public void testWatermarkWithDST() throws Exception {
+    // Test case 1
+    // Time zone: PST(America/Los_Angeles)
+    // Existing watermark millis: 1583654399999 (2020-03-07 23:59:59.999 PST)
+    // Actual watermark millis: 1583737199999 (2020-03-08 23:59:59.999 PST) 
with DST
+    testWatermarkWithDSTTimeZone("America/Los_Angeles", "1583654399999", 
"1583737199999");
+    // Test case 2
+    // Time zone: UTC
+    // Existing watermark millis: 1583625599999 (2020-03-07 23:59:59.999 UTC)
+    // Actual watermark millis: 1583711999999 (2020-03-08 23:59:59.999 UTC)
+    testWatermarkWithDSTTimeZone("UTC", "1583625599999", "1583711999999");
+  }
+
+  private void testWatermarkWithDSTTimeZone(String timeZone, String 
existingWatermark, String actualWatermark)
+      throws Exception {
+    String db = "db1";
+    String table = "table1";
+    String dataset = "db1/table1";
+    State state = new State();
+    String defaultDb = "tracking";
+    state.setProp(CompactionWatermarkAction.DEFAULT_HIVE_DB, defaultDb);
+
+    String inputDir = "/data/tracking";
+    String inputSubDir = "hourly";
+    String destSubDir = "daily";
+    String datasetPath = String.format("%s/%s/%s/2020/03/08", inputDir, 
dataset, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir);
+    state.setProp(HiveRegister.HIVE_REGISTER_TYPE, 
MockHiveRegister.class.getName());
+    state.setProp(CompactionWatermarkAction.GRANULARITY, "DAY");
+    state.setProp(MRCompactor.COMPACTION_TIMEZONE, timeZone);
+
+    State tableProps = new State();
+    tableProps.setProp(compactionWatermark, existingWatermark);
+    tableProps.setProp(completionCompactionWatermark, existingWatermark);
+    HiveTable existingTable = new 
HiveTable.Builder().withDbName(db).withTableName(table)
+        .withProps(tableProps).build();
+    MockHiveRegister.existingTable = existingTable;
+
+    CompactionWatermarkAction action = new CompactionWatermarkAction(state);
+    FileSystemDataset fsDataset = new SimpleFileSystemDataset(new 
Path(datasetPath));
+
+    doWatermarkTest(action, fsDataset, state, actualWatermark, 
actualWatermark);
+  }
+
   private void doWatermarkTest(CompactionWatermarkAction action, 
FileSystemDataset fsDataset,
       State state, String actualWatermark, String expectedWatermark)
       throws Exception {

Reply via email to