This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d18783d  [GOBBLIN-1012] Implement CompactionWithWatermarkSuite
d18783d is described below

commit d18783df7ab29b161cca59ca907fb47e1366b358
Author: zhchen <zhc...@linkedin.com>
AuthorDate: Fri Jan 3 10:59:20 2020 -0800

    [GOBBLIN-1012] Implement CompactionWithWatermarkSuite
    
    Closes #2857 from zxcware/comp3
---
 .../action/CompactionWatermarkAction.java          | 167 +++++++++++++++++
 .../compaction/mapreduce/MRCompactionTask.java     |   5 +-
 .../suite/CompactionWithWatermarkSuite.java        |  83 +++++++++
 .../suite/CompactionWithWatermarkSuiteFactory.java |  29 +++
 .../verify/CompactionWatermarkChecker.java         | 109 +++++++++++
 .../action/CompactionWatermarkActionTest.java      | 205 +++++++++++++++++++++
 .../verify/CompactionWatermarkCheckerTest.java     |  88 +++++++++
 .../dataset/TimePartitionGlobFinder.java           |   9 +-
 .../dataset/TimePartitionedGlobFinderTest.java     |   7 +
 9 files changed, 699 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
new file mode 100644
index 0000000..2c6e1c2
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.action;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.time.TimeIterator;
+
+
+/**
+ * The class publishes compaction watermarks, reported by {@link 
CompactionWatermarkChecker}, as hive table parameters.
+ * It guarantees compaction watermark is updated continuously and errors out 
if there is a gap, which indicates a
+ * compaction hole. At the time of writing, one should manually fill the 
compaction hole and update the existing
+ * watermarks in hive table parameters to recover automatic watermark publish
+ */
+@Slf4j
+public class CompactionWatermarkAction implements 
CompactionCompleteAction<FileSystemDataset> {
+
+  public static final String CONF_PREFIX = "compactionWatermarkAction";
+  public static final String GRANULARITY = CONF_PREFIX + ".granularity";
+  public static final String DEFAULT_HIVE_DB = CONF_PREFIX + ".defaultHiveDb";
+
+  private EventSubmitter submitter;
+  private State state;
+  private final String defaultHiveDb;
+  private final TimeIterator.Granularity granularity;
+
+  public CompactionWatermarkAction(State state) {
+    this.state = state;
+    defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
+    granularity = 
TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase());
+  }
+
+  @Override
+  public void onCompactionJobComplete(FileSystemDataset dataset)
+      throws IOException {
+
+    String compactionWatermark = 
state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK);
+    String completeCompactionWatermark = 
state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK);
+    if (StringUtils.isEmpty(compactionWatermark) && 
StringUtils.isEmpty(completeCompactionWatermark)) {
+      return;
+    }
+
+    CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
+    HiveDatasetFinder.DbAndTable dbAndTable = 
extractDbTable(result.getDatasetName());
+    String hiveDb = dbAndTable.getDb();
+    String hiveTable = dbAndTable.getTable();
+
+    HiveRegister hiveRegister = HiveRegister.get(state);
+    Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb, 
hiveTable);
+    if (!tableOptional.isPresent()) {
+      log.info("Table {}.{} not found. Skip publishing compaction watermarks", 
hiveDb, hiveTable);
+      return;
+    }
+
+    HiveTable table = tableOptional.get();
+    State tableProps = table.getProps();
+    boolean shouldUpdate =
+        mayUpdateWatermark(dataset, tableProps, 
CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark);
+    if (mayUpdateWatermark(dataset, tableProps, 
CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
+        completeCompactionWatermark)) {
+      shouldUpdate = true;
+    }
+
+    if (shouldUpdate) {
+      log.info("Alter table {}.{} to publish watermarks {}", hiveDb, 
hiveTable, tableProps);
+      hiveRegister.alterTable(table);
+    }
+  }
+
+  /**
+   * Update watermark if the new one is continuously higher than the existing 
one
+   */
+  private boolean mayUpdateWatermark(FileSystemDataset dataset, State props, 
String key, String newValue) {
+
+    if (StringUtils.isEmpty(newValue)) {
+      return false;
+    }
+
+    long existing = props.getPropAsLong(key, 0);
+    if (existing == 0) {
+      props.setProp(key, newValue);
+      return true;
+    }
+
+    long actualNextWatermark = Long.parseLong(newValue);
+    if (actualNextWatermark <= existing) {
+      return false;
+    }
+
+    long expectedWatermark = getExpectedNextWatermark(existing);
+    if (actualNextWatermark != expectedWatermark) {
+      String errMsg = String.format(
+          "Fail to advance %s of dataset %s: expect %s but got %s, please 
manually fill the gap and rerun.",
+          key, dataset.datasetRoot(), expectedWatermark, actualNextWatermark);
+      log.error(errMsg);
+      throw new RuntimeException(errMsg);
+    }
+
+    props.setProp(key, newValue);
+    return true;
+  }
+
+  /**
+   * To guarantee watermark continuity, the expected next watermark should be: 
{@code previousWatermark} + 1
+   * unit of {@link #granularity}
+   */
+  private long getExpectedNextWatermark(Long previousWatermark) {
+    ZonedDateTime previousWatermarkTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark),
+        ZoneId.systemDefault());
+    ZonedDateTime nextWatermarkTime = TimeIterator.inc(previousWatermarkTime, 
granularity, 1);
+    return nextWatermarkTime.toInstant().toEpochMilli();
+  }
+
+  @Override
+  public void addEventSubmitter(EventSubmitter submitter) {
+    this.submitter = submitter;
+  }
+
+  private HiveDatasetFinder.DbAndTable extractDbTable(String datasetName) {
+    String[] parts = datasetName.split("/");
+    if (parts.length == 0 || parts.length > 2) {
+      throw new RuntimeException(String.format("Unsupported dataset %s", 
datasetName));
+    }
+    String hiveDb = defaultHiveDb;
+    String hiveTable = parts[0];
+    // Use the db from the datasetName if it has
+    if (parts.length == 2) {
+      hiveDb = parts[0];
+      hiveTable = parts[1];
+    }
+
+    return new HiveDatasetFinder.DbAndTable(hiveDb, hiveTable);
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 3270819..ceb5108 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -75,7 +75,7 @@ public class MRCompactionTask extends MRTask {
     List<CompactionVerifier> verifiers = this.suite.getMapReduceVerifiers();
     for (CompactionVerifier verifier : verifiers) {
       if (!verifier.verify(dataset).isSuccessful()) {
-        log.error("Verification {} for {} is not passed.", verifier.getName(), 
dataset.datasetURN());
+        log.error("Verification {} for {} is not passed.", verifier.getName(), 
dataset.getUrn());
         this.onMRTaskComplete (false, new IOException("Compaction verification 
for MR is failed"));
         return;
       }
@@ -83,7 +83,8 @@ public class MRCompactionTask extends MRTask {
 
     if (dataset instanceof FileSystemDataset
         && ((FileSystemDataset)dataset).isVirtual()) {
-      log.info("A trivial compaction job as there is no physical data. Will 
trigger a success complete directly");
+      log.info("A trivial compaction job as there is no physical data for {}."
+          + "Will trigger a success complete directly", dataset.getUrn());
       this.onMRTaskComplete(true, null);
       return;
     }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java
new file mode 100644
index 0000000..243f394
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.gobblin.compaction.action.CompactionCompleteAction;
+import 
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
+import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
+import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
+import org.apache.gobblin.compaction.action.CompactionWatermarkAction;
+import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
+import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+
+
+/**
+ * Compaction suite with watermark checking and publishing for file system 
dataset of
+ * path pattern, [path prefix]/[dataset name]/[partition 
prefix]/yyyy/MM/[dd/HH/mm], for
+ * example:
+ * <ul>
+ *   <li> home/event1/hourly/2019/12/31 </li>
+ *   <li> home/event2/hourly/2019/12/31/10 </li>
+ *   <li> home/dbName/tableName/hourly/2019/12/31 </li>
+ * </ul>
+ *
+ * The watermarks are published to hive metastore
+ */
+public class CompactionWithWatermarkSuite extends CompactionSuiteBase {
+  /**
+   * Constructor
+   * @param state
+   */
+  public CompactionWithWatermarkSuite(State state) {
+    super(state);
+  }
+
+  @Override
+  public List<CompactionVerifier<FileSystemDataset>> 
getDatasetsFinderVerifiers() {
+    List<CompactionVerifier<FileSystemDataset>> list = new LinkedList<>();
+    list.add(new CompactionTimeRangeVerifier(state));
+    list.add(new CompactionThresholdVerifier(state));
+    return list;
+  }
+
+  @Override
+  public List<CompactionVerifier<FileSystemDataset>> getMapReduceVerifiers() {
+    List<CompactionVerifier<FileSystemDataset>> list = new LinkedList<>();
+    list.add(new CompactionWatermarkChecker(state));
+    return list;
+  }
+
+  @Override
+  public List<CompactionCompleteAction<FileSystemDataset>> 
getCompactionCompleteActions() {
+    ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new 
ArrayList<>();
+    array.add(new CompactionCompleteFileOperationAction(state, 
getConfigurator()));
+    array.add(new CompactionHiveRegistrationAction(state));
+    // Publish compaction watermarks right after hive registration
+    array.add(new CompactionWatermarkAction(state));
+    array.add(new CompactionMarkDirectoryAction(state, getConfigurator()));
+    return array;
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java
new file mode 100644
index 0000000..df479ce
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+
+
+@Alias("CompactionWithWatermarkSuiteFactory")
+public class CompactionWithWatermarkSuiteFactory implements 
CompactionSuiteFactory {
+  public CompactionWithWatermarkSuite createSuite (State state) {
+    return new CompactionWithWatermarkSuite(state);
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java
new file mode 100644
index 0000000..febc8b7
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.verify;
+
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.time.temporal.ChronoUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
+
+
+/**
+ * A {@link CompactionAuditCountVerifier} to report compaction watermarks 
based on verification
+ * result
+ *
+ * <p> A {@code watermarkTime} is the previous time of {@value 
CompactionSource#COMPACTION_INIT_TIME}. It
+ * can be computed in different {@link 
org.apache.gobblin.time.TimeIterator.Granularity}. For example, if
+ * compaction init time is 2019/12/01 18:16:00.000, its compaction watermark 
in minute granularity is the
+ * last millis of previous minute, 2019/12/01 18:15:59.999, and watermark in 
day granularity is the last
+ * millis of previous day, 2019/11/30 23:59:59.999.
+ *
+ * <p> The checker will report {@code watermarkTime} in epoc millis as {@value 
COMPACTION_WATERMARK}
+ * regardless of audit counts. If audit counts match, it will also report the 
time in epoc millis
+ * as {@value COMPLETION_COMPACTION_WATERMARK}
+ */
+@Slf4j
+public class CompactionWatermarkChecker extends CompactionAuditCountVerifier {
+
+  public static final String TIME_FORMAT = 
"compactionWatermarkChecker.timeFormat";
+  public static final String COMPACTION_WATERMARK = "compactionWatermark";
+  public static final String COMPLETION_COMPACTION_WATERMARK = 
"completionAndCompactionWatermark";
+
+  private final long watermarkTime;
+  private final String precedingTimeDatasetPartitionName;
+
+  public CompactionWatermarkChecker(State state) {
+    super(state);
+    ZonedDateTime compactionTime = ZonedDateTime.ofInstant(
+        
Instant.ofEpochMilli(state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME)),
 zone);
+    ZonedDateTime precedingTime = TimeIterator.dec(compactionTime, 
granularity, 1);
+    DateTimeFormatter timeFormatter = 
DateTimeFormatter.ofPattern(state.getProp(TIME_FORMAT));
+    precedingTimeDatasetPartitionName = timeFormatter.format(precedingTime);
+    watermarkTime = getWatermarkTimeMillis(compactionTime, granularity);
+  }
+
+  @VisibleForTesting
+  static long getWatermarkTimeMillis(ZonedDateTime compactionTime, 
TimeIterator.Granularity granularity) {
+    ZonedDateTime startOfMinute = 
compactionTime.withSecond(0).with(ChronoField.MILLI_OF_SECOND, 0);
+    ZonedDateTime startOfTimeGranularity = startOfMinute;
+    switch (granularity) {
+      case MINUTE:
+        break;
+      case HOUR:
+        startOfTimeGranularity = startOfMinute.withMinute(0);
+        break;
+      case DAY:
+        startOfTimeGranularity = startOfMinute.withHour(0).withMinute(0);
+        break;
+      case MONTH:
+        startOfTimeGranularity = 
startOfMinute.withDayOfMonth(1).withHour(0).withMinute(0);
+        break;
+    }
+    // The last millis of the start granularity
+    return startOfTimeGranularity.minus(1, 
ChronoUnit.MILLIS).toInstant().toEpochMilli();
+  }
+
+  @Override
+  public Result verify(FileSystemDataset dataset) {
+    Result res = super.verify(dataset);
+    if 
(!dataset.datasetRoot().toString().contains(precedingTimeDatasetPartitionName)) 
{
+      return res;
+    }
+
+    // set compaction watermark
+    this.state.setProp(COMPACTION_WATERMARK, watermarkTime);
+    if (enabled && res.isSuccessful()) {
+      log.info("Set dataset {} complete and compaction watermark {}", 
dataset.datasetRoot(), watermarkTime);
+      // If it also passed completeness check
+      this.state.setProp(COMPLETION_COMPACTION_WATERMARK, watermarkTime);
+    } else {
+      log.info("Set dataset {} compaction watermark {}", 
dataset.datasetRoot(), watermarkTime);
+    }
+    return res;
+  }
+}
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
new file mode 100644
index 0000000..038268c
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.action;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.HiveSpec;
+
+
+public class CompactionWatermarkActionTest {
+
+  private final String compactionWatermark = "compactionWatermark";
+  private final String completionCompactionWatermark = 
"completionAndCompactionWatermark";
+
+  @Test
+  public void testUpdateWatermark()
+      throws Exception {
+    doTestUpdateWatermark("tracking","event1", "event1");
+    doTestUpdateWatermark("db1","table1", "db1/table1");
+  }
+
+  private void doTestUpdateWatermark(String db, String table, String dataset)
+      throws Exception {
+    State state = new State();
+    String defaultDb = "tracking";
+    state.setProp(CompactionWatermarkAction.DEFAULT_HIVE_DB, defaultDb);
+
+    String inputDir = "/data/tracking";
+    String inputSubDir = "hourly";
+    String destSubDir = "daily";
+    String datasetPath = String.format("%s/%s/%s/2019/12/20", inputDir, 
dataset, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir);
+    state.setProp(HiveRegister.HIVE_REGISTER_TYPE, 
MockHiveRegister.class.getName());
+    state.setProp(CompactionWatermarkAction.GRANULARITY, "DAY");
+
+    State tableProps = new State();
+    // 2019-12-31 23:59:59.999
+    String existingWatermark = "1577836799999";
+    tableProps.setProp(compactionWatermark, existingWatermark);
+    tableProps.setProp(completionCompactionWatermark, existingWatermark);
+    HiveTable existingTable = new 
HiveTable.Builder().withDbName(db).withTableName(table)
+        .withProps(tableProps).build();
+    MockHiveRegister.existingTable = existingTable;
+
+    CompactionWatermarkAction action = new CompactionWatermarkAction(state);
+    FileSystemDataset fsDataset = new SimpleFileSystemDataset(new 
Path(datasetPath));
+
+    // Will not update if old watermarks are reported
+    String actualWatermark = "1577750399999"; // 2019-10-30 23:59:59.999
+    doWatermarkTest(action, fsDataset, state, actualWatermark, 
existingWatermark);
+
+    // Will throw a runtime exception if watermark is not continuous
+    Exception exception = null;
+    try {
+      actualWatermark = "1578009599999"; // 2020-01-01 23:59:59.999
+      doWatermarkTest(action, fsDataset, state, actualWatermark, 
actualWatermark);
+    } catch (Exception e) {
+      exception = e;
+    }
+    Assert.assertEquals(exception.getMessage(),
+        String.format("Fail to advance %s of dataset %s: expect 1577923199999 
but got %s, "
+            + "please manually fill the gap and rerun.",
+            compactionWatermark, fsDataset.datasetRoot(), actualWatermark));
+
+    // Will update if newer continuous watermarks are reported
+    actualWatermark = "1577923199999"; // 2020-01-01 23:59:59.999
+    doWatermarkTest(action, fsDataset, state, actualWatermark, 
actualWatermark);
+  }
+
+  private void doWatermarkTest(CompactionWatermarkAction action, 
FileSystemDataset fsDataset,
+      State state, String actualWatermark, String expectedWatermark)
+      throws Exception {
+    state.setProp(CompactionWatermarkChecker.COMPACTION_WATERMARK, 
actualWatermark);
+    state.setProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK, 
actualWatermark);
+
+    action.onCompactionJobComplete(fsDataset);
+
+    
Assert.assertEquals(MockHiveRegister.existingTable.getProps().getProp(compactionWatermark),
+        expectedWatermark);
+    
Assert.assertEquals(MockHiveRegister.existingTable.getProps().getProp(completionCompactionWatermark),
+        expectedWatermark);
+  }
+
+  public static class MockHiveRegister extends HiveRegister {
+
+    static HiveTable existingTable;
+
+    public MockHiveRegister(State state, Optional<String> uri) {
+      super(state);
+    }
+
+    @Override
+    protected void registerPath(HiveSpec spec)
+        throws IOException {
+
+    }
+
+    @Override
+    public boolean createDbIfNotExists(String dbName)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean createTableIfNotExists(HiveTable table)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean addPartitionIfNotExists(HiveTable table, HivePartition 
partition)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean existsTable(String dbName, String tableName)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean existsPartition(String dbName, String tableName, 
List<HiveRegistrationUnit.Column> partitionKeys,
+        List<String> partitionValues)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void dropTableIfExists(String dbName, String tableName)
+        throws IOException {
+
+    }
+
+    @Override
+    public void dropPartitionIfExists(String dbName, String tableName, 
List<HiveRegistrationUnit.Column> partitionKeys,
+        List<String> partitionValues)
+        throws IOException {
+
+    }
+
+    @Override
+    public Optional<HiveTable> getTable(String dbName, String tableName)
+        throws IOException {
+      if (dbName.equals(existingTable.getDbName())
+          && tableName.equals(existingTable.getTableName())) {
+        return Optional.of(existingTable);
+      }
+      return Optional.absent();
+    }
+
+    @Override
+    public Optional<HivePartition> getPartition(String dbName, String 
tableName,
+        List<HiveRegistrationUnit.Column> partitionKeys, List<String> 
partitionValues)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public void alterTable(HiveTable table)
+        throws IOException {
+      existingTable = table;
+    }
+
+    @Override
+    public void alterPartition(HiveTable table, HivePartition partition)
+        throws IOException {
+
+    }
+  }
+}
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java
new file mode 100644
index 0000000..b2c26ee
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.verify;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
+
+
+public class CompactionWatermarkCheckerTest {
+
+  private final ZoneId zone = ZoneId.of("America/Los_Angeles");
+
+  @Test
+  public void testGetWatermark() {
+    // 2019/12/01 18:16:00.000
+    ZonedDateTime time = ZonedDateTime.of(2019, 12, 1, 18, 16, 0, 0, zone);
+    // minute watermark is 2019/12/01 18:15:59.999
+    
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, 
TimeIterator.Granularity.MINUTE),
+        1575252959999L);
+    // hour watermark is 2019/12/01 17:59:59.999
+    
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, 
TimeIterator.Granularity.HOUR),
+        1575251999999L);
+    // day watermark is 2019/11/30 23:59:59.999
+    
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, 
TimeIterator.Granularity.DAY),
+        1575187199999L);
+    // month watermark is 2019/11/30 23:59:59.999
+    
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, 
TimeIterator.Granularity.MONTH),
+        1575187199999L);
+  }
+
+  @Test
+  public void testVerify() {
+    ZonedDateTime time = ZonedDateTime.of(2019, 12, 1, 18, 16, 0, 0, zone);
+    State state = new State();
+    state.setProp(CompactionSource.COMPACTION_INIT_TIME, 
time.toInstant().toEpochMilli());
+    
state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_GRANULARITY,
 "DAY");
+    state.setProp(CompactionWatermarkChecker.TIME_FORMAT, "yyyy/MM/dd");
+
+    FileSystemDataset dataset1201 = new SimpleFileSystemDataset(new 
Path("/dataset/2019/12/01"));
+    FileSystemDataset dataset1130 = new SimpleFileSystemDataset(new 
Path("/dataset/2019/11/30"));
+    FileSystemDataset datasetDash = new SimpleFileSystemDataset(new 
Path("/dataset/datepartition=2019-11-30"));
+
+    // CASE: completeness is disabled
+    
state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_ENABLED, 
false);
+    doVerifyDataset(new State(state), dataset1201, null, null);
+    doVerifyDataset(new State(state), dataset1130, "1575187199999", null);
+    doVerifyDataset(new State(state), datasetDash, null, null);
+
+    // CASE: completeness is enabld
+    
state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_ENABLED, 
true);
+    doVerifyDataset(new State(state), dataset1201, null, null);
+    doVerifyDataset(new State(state), dataset1130, "1575187199999", 
"1575187199999");
+    doVerifyDataset(new State(state), datasetDash, null, null);
+  }
+
+  private void doVerifyDataset(State state, FileSystemDataset dataset, String 
compactionWatermark, String completionAndCompactionWatermark) {
+    CompactionWatermarkChecker checker = new CompactionWatermarkChecker(state);
+    checker.verify(dataset);
+    
Assert.assertEquals(state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK),
 compactionWatermark);
+    
Assert.assertEquals(state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK),
+        completionAndCompactionWatermark);
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
index 8a707cc..14dbba0 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
@@ -174,7 +174,14 @@ public class TimePartitionGlobFinder implements 
DatasetsFinder<FileSystemDataset
       // All 12 months
       return new StringBuilder("*");
     }
-    StringBuilder monthOptions = new StringBuilder("{" + startMonth);
+
+    // Append start month
+    StringBuilder monthOptions = new StringBuilder("{");
+    if (startMonth < 10) {
+      monthOptions.append("0");
+    }
+    monthOptions.append(startMonth);
+
     if (endMonth >= startMonth) {
       appendOptions(monthOptions, startMonth + 1, endMonth);
     } else {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
index 4646029..97eb940 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
@@ -186,6 +186,13 @@ public class TimePartitionedGlobFinderTest {
         "{2018,2019}/{11,12,01}/*/*");
     Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, "yyyy-MM-dd-HH"),
         "{2018,2019}-{11,12,01}*");
+
+    // 2019/1/1 - 2019/1/3
+    start = start.withYear(2019).withMonth(1).withDayOfMonth(1);
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, slashTimeFormat),
+        "{2019}/{01}/*");
+    Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, 
end, dashTimeFormat),
+        "{2019}-{01}*");
   }
 
   @Test

Reply via email to