This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d18783d [GOBBLIN-1012] Implement CompactionWithWatermarkSuite
d18783d is described below
commit d18783df7ab29b161cca59ca907fb47e1366b358
Author: zhchen <[email protected]>
AuthorDate: Fri Jan 3 10:59:20 2020 -0800
[GOBBLIN-1012] Implement CompactionWithWatermarkSuite
Closes #2857 from zxcware/comp3
---
.../action/CompactionWatermarkAction.java | 167 +++++++++++++++++
.../compaction/mapreduce/MRCompactionTask.java | 5 +-
.../suite/CompactionWithWatermarkSuite.java | 83 +++++++++
.../suite/CompactionWithWatermarkSuiteFactory.java | 29 +++
.../verify/CompactionWatermarkChecker.java | 109 +++++++++++
.../action/CompactionWatermarkActionTest.java | 205 +++++++++++++++++++++
.../verify/CompactionWatermarkCheckerTest.java | 88 +++++++++
.../dataset/TimePartitionGlobFinder.java | 9 +-
.../dataset/TimePartitionedGlobFinderTest.java | 7 +
9 files changed, 699 insertions(+), 3 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
new file mode 100644
index 0000000..2c6e1c2
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.action;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.time.TimeIterator;
+
+
+/**
+ * The class publishes compaction watermarks, reported by {@link
CompactionWatermarkChecker}, as hive table parameters.
+ * It guarantees compaction watermark is updated continuously and errors out
if there is a gap, which indicates a
+ * compaction hole. At the time of writing, one should manually fill the
compaction hole and update the existing
+ * watermarks in hive table parameters to recover automatic watermark publish
+ */
+@Slf4j
+public class CompactionWatermarkAction implements
CompactionCompleteAction<FileSystemDataset> {
+
+ public static final String CONF_PREFIX = "compactionWatermarkAction";
+ public static final String GRANULARITY = CONF_PREFIX + ".granularity";
+ public static final String DEFAULT_HIVE_DB = CONF_PREFIX + ".defaultHiveDb";
+
+ private EventSubmitter submitter;
+ private State state;
+ private final String defaultHiveDb;
+ private final TimeIterator.Granularity granularity;
+
+ public CompactionWatermarkAction(State state) {
+ this.state = state;
+ defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
+ granularity =
TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase());
+ }
+
+ @Override
+ public void onCompactionJobComplete(FileSystemDataset dataset)
+ throws IOException {
+
+ String compactionWatermark =
state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK);
+ String completeCompactionWatermark =
state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK);
+ if (StringUtils.isEmpty(compactionWatermark) &&
StringUtils.isEmpty(completeCompactionWatermark)) {
+ return;
+ }
+
+ CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
+ HiveDatasetFinder.DbAndTable dbAndTable =
extractDbTable(result.getDatasetName());
+ String hiveDb = dbAndTable.getDb();
+ String hiveTable = dbAndTable.getTable();
+
+ HiveRegister hiveRegister = HiveRegister.get(state);
+ Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb,
hiveTable);
+ if (!tableOptional.isPresent()) {
+ log.info("Table {}.{} not found. Skip publishing compaction watermarks",
hiveDb, hiveTable);
+ return;
+ }
+
+ HiveTable table = tableOptional.get();
+ State tableProps = table.getProps();
+ boolean shouldUpdate =
+ mayUpdateWatermark(dataset, tableProps,
CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark);
+ if (mayUpdateWatermark(dataset, tableProps,
CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
+ completeCompactionWatermark)) {
+ shouldUpdate = true;
+ }
+
+ if (shouldUpdate) {
+ log.info("Alter table {}.{} to publish watermarks {}", hiveDb,
hiveTable, tableProps);
+ hiveRegister.alterTable(table);
+ }
+ }
+
+ /**
+ * Update watermark if the new one is continuously higher than the existing
one
+ */
+ private boolean mayUpdateWatermark(FileSystemDataset dataset, State props,
String key, String newValue) {
+
+ if (StringUtils.isEmpty(newValue)) {
+ return false;
+ }
+
+ long existing = props.getPropAsLong(key, 0);
+ if (existing == 0) {
+ props.setProp(key, newValue);
+ return true;
+ }
+
+ long actualNextWatermark = Long.parseLong(newValue);
+ if (actualNextWatermark <= existing) {
+ return false;
+ }
+
+ long expectedWatermark = getExpectedNextWatermark(existing);
+ if (actualNextWatermark != expectedWatermark) {
+ String errMsg = String.format(
+ "Fail to advance %s of dataset %s: expect %s but got %s, please
manually fill the gap and rerun.",
+ key, dataset.datasetRoot(), expectedWatermark, actualNextWatermark);
+ log.error(errMsg);
+ throw new RuntimeException(errMsg);
+ }
+
+ props.setProp(key, newValue);
+ return true;
+ }
+
+ /**
+ * To guarantee watermark continuity, the expected next watermark should be:
{@code previousWatermark} + 1
+ * unit of {@link #granularity}
+ */
+ private long getExpectedNextWatermark(Long previousWatermark) {
+ ZonedDateTime previousWatermarkTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark),
+ ZoneId.systemDefault());
+ ZonedDateTime nextWatermarkTime = TimeIterator.inc(previousWatermarkTime,
granularity, 1);
+ return nextWatermarkTime.toInstant().toEpochMilli();
+ }
+
+ @Override
+ public void addEventSubmitter(EventSubmitter submitter) {
+ this.submitter = submitter;
+ }
+
+ private HiveDatasetFinder.DbAndTable extractDbTable(String datasetName) {
+ String[] parts = datasetName.split("/");
+ if (parts.length == 0 || parts.length > 2) {
+ throw new RuntimeException(String.format("Unsupported dataset %s",
datasetName));
+ }
+ String hiveDb = defaultHiveDb;
+ String hiveTable = parts[0];
+ // Use the db from the datasetName if it has
+ if (parts.length == 2) {
+ hiveDb = parts[0];
+ hiveTable = parts[1];
+ }
+
+ return new HiveDatasetFinder.DbAndTable(hiveDb, hiveTable);
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 3270819..ceb5108 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -75,7 +75,7 @@ public class MRCompactionTask extends MRTask {
List<CompactionVerifier> verifiers = this.suite.getMapReduceVerifiers();
for (CompactionVerifier verifier : verifiers) {
if (!verifier.verify(dataset).isSuccessful()) {
- log.error("Verification {} for {} is not passed.", verifier.getName(),
dataset.datasetURN());
+ log.error("Verification {} for {} is not passed.", verifier.getName(),
dataset.getUrn());
this.onMRTaskComplete (false, new IOException("Compaction verification
for MR is failed"));
return;
}
@@ -83,7 +83,8 @@ public class MRCompactionTask extends MRTask {
if (dataset instanceof FileSystemDataset
&& ((FileSystemDataset)dataset).isVirtual()) {
- log.info("A trivial compaction job as there is no physical data. Will
trigger a success complete directly");
+ log.info("A trivial compaction job as there is no physical data for {}."
+ + "Will trigger a success complete directly", dataset.getUrn());
this.onMRTaskComplete(true, null);
return;
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java
new file mode 100644
index 0000000..243f394
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.gobblin.compaction.action.CompactionCompleteAction;
+import
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
+import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
+import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
+import org.apache.gobblin.compaction.action.CompactionWatermarkAction;
+import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
+import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+
+
+/**
+ * Compaction suite with watermark checking and publishing for file system
dataset of
+ * path pattern, [path prefix]/[dataset name]/[partition
prefix]/yyyy/MM/[dd/HH/mm], for
+ * example:
+ * <ul>
+ * <li> home/event1/hourly/2019/12/31 </li>
+ * <li> home/event2/hourly/2019/12/31/10 </li>
+ * <li> home/dbName/tableName/hourly/2019/12/31 </li>
+ * </ul>
+ *
+ * The watermarks are published to hive metastore
+ */
+public class CompactionWithWatermarkSuite extends CompactionSuiteBase {
+ /**
+ * Constructor
+ * @param state
+ */
+ public CompactionWithWatermarkSuite(State state) {
+ super(state);
+ }
+
+ @Override
+ public List<CompactionVerifier<FileSystemDataset>>
getDatasetsFinderVerifiers() {
+ List<CompactionVerifier<FileSystemDataset>> list = new LinkedList<>();
+ list.add(new CompactionTimeRangeVerifier(state));
+ list.add(new CompactionThresholdVerifier(state));
+ return list;
+ }
+
+ @Override
+ public List<CompactionVerifier<FileSystemDataset>> getMapReduceVerifiers() {
+ List<CompactionVerifier<FileSystemDataset>> list = new LinkedList<>();
+ list.add(new CompactionWatermarkChecker(state));
+ return list;
+ }
+
+ @Override
+ public List<CompactionCompleteAction<FileSystemDataset>>
getCompactionCompleteActions() {
+ ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new
ArrayList<>();
+ array.add(new CompactionCompleteFileOperationAction(state,
getConfigurator()));
+ array.add(new CompactionHiveRegistrationAction(state));
+ // Publish compaction watermarks right after hive registration
+ array.add(new CompactionWatermarkAction(state));
+ array.add(new CompactionMarkDirectoryAction(state, getConfigurator()));
+ return array;
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java
new file mode 100644
index 0000000..df479ce
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.State;
+
+
+@Alias("CompactionWithWatermarkSuiteFactory")
+public class CompactionWithWatermarkSuiteFactory implements
CompactionSuiteFactory {
+ public CompactionWithWatermarkSuite createSuite (State state) {
+ return new CompactionWithWatermarkSuite(state);
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java
new file mode 100644
index 0000000..febc8b7
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.verify;
+
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoField;
+import java.time.temporal.ChronoUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
+
+
+/**
+ * A {@link CompactionAuditCountVerifier} to report compaction watermarks
based on verification
+ * result
+ *
+ * <p> A {@code watermarkTime} is the previous time of {@value
CompactionSource#COMPACTION_INIT_TIME}. It
+ * can be computed in different {@link
org.apache.gobblin.time.TimeIterator.Granularity}. For example, if
+ * compaction init time is 2019/12/01 18:16:00.000, its compaction watermark
in minute granularity is the
+ * last millis of previous minute, 2019/12/01 18:15:59.999, and watermark in
day granularity is the last
+ * millis of previous day, 2019/11/30 23:59:59.999.
+ *
+ * <p> The checker will report {@code watermarkTime} in epoc millis as {@value
COMPACTION_WATERMARK}
+ * regardless of audit counts. If audit counts match, it will also report the
time in epoc millis
+ * as {@value COMPLETION_COMPACTION_WATERMARK}
+ */
+@Slf4j
+public class CompactionWatermarkChecker extends CompactionAuditCountVerifier {
+
+ public static final String TIME_FORMAT =
"compactionWatermarkChecker.timeFormat";
+ public static final String COMPACTION_WATERMARK = "compactionWatermark";
+ public static final String COMPLETION_COMPACTION_WATERMARK =
"completionAndCompactionWatermark";
+
+ private final long watermarkTime;
+ private final String precedingTimeDatasetPartitionName;
+
+ public CompactionWatermarkChecker(State state) {
+ super(state);
+ ZonedDateTime compactionTime = ZonedDateTime.ofInstant(
+
Instant.ofEpochMilli(state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME)),
zone);
+ ZonedDateTime precedingTime = TimeIterator.dec(compactionTime,
granularity, 1);
+ DateTimeFormatter timeFormatter =
DateTimeFormatter.ofPattern(state.getProp(TIME_FORMAT));
+ precedingTimeDatasetPartitionName = timeFormatter.format(precedingTime);
+ watermarkTime = getWatermarkTimeMillis(compactionTime, granularity);
+ }
+
+ @VisibleForTesting
+ static long getWatermarkTimeMillis(ZonedDateTime compactionTime,
TimeIterator.Granularity granularity) {
+ ZonedDateTime startOfMinute =
compactionTime.withSecond(0).with(ChronoField.MILLI_OF_SECOND, 0);
+ ZonedDateTime startOfTimeGranularity = startOfMinute;
+ switch (granularity) {
+ case MINUTE:
+ break;
+ case HOUR:
+ startOfTimeGranularity = startOfMinute.withMinute(0);
+ break;
+ case DAY:
+ startOfTimeGranularity = startOfMinute.withHour(0).withMinute(0);
+ break;
+ case MONTH:
+ startOfTimeGranularity =
startOfMinute.withDayOfMonth(1).withHour(0).withMinute(0);
+ break;
+ }
+ // The last millis of the start granularity
+ return startOfTimeGranularity.minus(1,
ChronoUnit.MILLIS).toInstant().toEpochMilli();
+ }
+
+ @Override
+ public Result verify(FileSystemDataset dataset) {
+ Result res = super.verify(dataset);
+ if
(!dataset.datasetRoot().toString().contains(precedingTimeDatasetPartitionName))
{
+ return res;
+ }
+
+ // set compaction watermark
+ this.state.setProp(COMPACTION_WATERMARK, watermarkTime);
+ if (enabled && res.isSuccessful()) {
+ log.info("Set dataset {} complete and compaction watermark {}",
dataset.datasetRoot(), watermarkTime);
+ // If it also passed completeness check
+ this.state.setProp(COMPLETION_COMPACTION_WATERMARK, watermarkTime);
+ } else {
+ log.info("Set dataset {} compaction watermark {}",
dataset.datasetRoot(), watermarkTime);
+ }
+ return res;
+ }
+}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
new file mode 100644
index 0000000..038268c
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.action;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.HiveSpec;
+
+
+public class CompactionWatermarkActionTest {
+
+ private final String compactionWatermark = "compactionWatermark";
+ private final String completionCompactionWatermark =
"completionAndCompactionWatermark";
+
+ @Test
+ public void testUpdateWatermark()
+ throws Exception {
+ doTestUpdateWatermark("tracking","event1", "event1");
+ doTestUpdateWatermark("db1","table1", "db1/table1");
+ }
+
+ private void doTestUpdateWatermark(String db, String table, String dataset)
+ throws Exception {
+ State state = new State();
+ String defaultDb = "tracking";
+ state.setProp(CompactionWatermarkAction.DEFAULT_HIVE_DB, defaultDb);
+
+ String inputDir = "/data/tracking";
+ String inputSubDir = "hourly";
+ String destSubDir = "daily";
+ String datasetPath = String.format("%s/%s/%s/2019/12/20", inputDir,
dataset, inputSubDir);
+ state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir);
+ state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir);
+ state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir);
+ state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir);
+ state.setProp(HiveRegister.HIVE_REGISTER_TYPE,
MockHiveRegister.class.getName());
+ state.setProp(CompactionWatermarkAction.GRANULARITY, "DAY");
+
+ State tableProps = new State();
+ // 2019-12-31 23:59:59.999
+ String existingWatermark = "1577836799999";
+ tableProps.setProp(compactionWatermark, existingWatermark);
+ tableProps.setProp(completionCompactionWatermark, existingWatermark);
+ HiveTable existingTable = new
HiveTable.Builder().withDbName(db).withTableName(table)
+ .withProps(tableProps).build();
+ MockHiveRegister.existingTable = existingTable;
+
+ CompactionWatermarkAction action = new CompactionWatermarkAction(state);
+ FileSystemDataset fsDataset = new SimpleFileSystemDataset(new
Path(datasetPath));
+
+ // Will not update if old watermarks are reported
+ String actualWatermark = "1577750399999"; // 2019-10-30 23:59:59.999
+ doWatermarkTest(action, fsDataset, state, actualWatermark,
existingWatermark);
+
+ // Will throw a runtime exception if watermark is not continuous
+ Exception exception = null;
+ try {
+ actualWatermark = "1578009599999"; // 2020-01-01 23:59:59.999
+ doWatermarkTest(action, fsDataset, state, actualWatermark,
actualWatermark);
+ } catch (Exception e) {
+ exception = e;
+ }
+ Assert.assertEquals(exception.getMessage(),
+ String.format("Fail to advance %s of dataset %s: expect 1577923199999
but got %s, "
+ + "please manually fill the gap and rerun.",
+ compactionWatermark, fsDataset.datasetRoot(), actualWatermark));
+
+ // Will update if newer continuous watermarks are reported
+ actualWatermark = "1577923199999"; // 2020-01-01 23:59:59.999
+ doWatermarkTest(action, fsDataset, state, actualWatermark,
actualWatermark);
+ }
+
+ private void doWatermarkTest(CompactionWatermarkAction action,
FileSystemDataset fsDataset,
+ State state, String actualWatermark, String expectedWatermark)
+ throws Exception {
+ state.setProp(CompactionWatermarkChecker.COMPACTION_WATERMARK,
actualWatermark);
+ state.setProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
actualWatermark);
+
+ action.onCompactionJobComplete(fsDataset);
+
+
Assert.assertEquals(MockHiveRegister.existingTable.getProps().getProp(compactionWatermark),
+ expectedWatermark);
+
Assert.assertEquals(MockHiveRegister.existingTable.getProps().getProp(completionCompactionWatermark),
+ expectedWatermark);
+ }
+
+ public static class MockHiveRegister extends HiveRegister {
+
+ static HiveTable existingTable;
+
+ public MockHiveRegister(State state, Optional<String> uri) {
+ super(state);
+ }
+
+ @Override
+ protected void registerPath(HiveSpec spec)
+ throws IOException {
+
+ }
+
+ @Override
+ public boolean createDbIfNotExists(String dbName)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean createTableIfNotExists(HiveTable table)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean addPartitionIfNotExists(HiveTable table, HivePartition
partition)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean existsTable(String dbName, String tableName)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean existsPartition(String dbName, String tableName,
List<HiveRegistrationUnit.Column> partitionKeys,
+ List<String> partitionValues)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public void dropTableIfExists(String dbName, String tableName)
+ throws IOException {
+
+ }
+
+ @Override
+ public void dropPartitionIfExists(String dbName, String tableName,
List<HiveRegistrationUnit.Column> partitionKeys,
+ List<String> partitionValues)
+ throws IOException {
+
+ }
+
+ @Override
+ public Optional<HiveTable> getTable(String dbName, String tableName)
+ throws IOException {
+ if (dbName.equals(existingTable.getDbName())
+ && tableName.equals(existingTable.getTableName())) {
+ return Optional.of(existingTable);
+ }
+ return Optional.absent();
+ }
+
+ @Override
+ public Optional<HivePartition> getPartition(String dbName, String
tableName,
+ List<HiveRegistrationUnit.Column> partitionKeys, List<String>
partitionValues)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void alterTable(HiveTable table)
+ throws IOException {
+ existingTable = table;
+ }
+
+ @Override
+ public void alterPartition(HiveTable table, HivePartition partition)
+ throws IOException {
+
+ }
+ }
+}
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java
new file mode 100644
index 0000000..b2c26ee
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.verify;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
+
+
+public class CompactionWatermarkCheckerTest {
+
+ private final ZoneId zone = ZoneId.of("America/Los_Angeles");
+
+ @Test
+ public void testGetWatermark() {
+ // 2019/12/01 18:16:00.000
+ ZonedDateTime time = ZonedDateTime.of(2019, 12, 1, 18, 16, 0, 0, zone);
+ // minute watermark is 2019/12/01 18:15:59.999
+
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time,
TimeIterator.Granularity.MINUTE),
+ 1575252959999L);
+ // hour watermark is 2019/12/01 17:59:59.999
+
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time,
TimeIterator.Granularity.HOUR),
+ 1575251999999L);
+ // day watermark is 2019/11/30 23:59:59.999
+
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time,
TimeIterator.Granularity.DAY),
+ 1575187199999L);
+ // month watermark is 2019/11/30 23:59:59.999
+
Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time,
TimeIterator.Granularity.MONTH),
+ 1575187199999L);
+ }
+
+ @Test
+ public void testVerify() {
+ ZonedDateTime time = ZonedDateTime.of(2019, 12, 1, 18, 16, 0, 0, zone);
+ State state = new State();
+ state.setProp(CompactionSource.COMPACTION_INIT_TIME,
time.toInstant().toEpochMilli());
+
state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_GRANULARITY,
"DAY");
+ state.setProp(CompactionWatermarkChecker.TIME_FORMAT, "yyyy/MM/dd");
+
+ FileSystemDataset dataset1201 = new SimpleFileSystemDataset(new
Path("/dataset/2019/12/01"));
+ FileSystemDataset dataset1130 = new SimpleFileSystemDataset(new
Path("/dataset/2019/11/30"));
+ FileSystemDataset datasetDash = new SimpleFileSystemDataset(new
Path("/dataset/datepartition=2019-11-30"));
+
+ // CASE: completeness is disabled
+
state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_ENABLED,
false);
+ doVerifyDataset(new State(state), dataset1201, null, null);
+ doVerifyDataset(new State(state), dataset1130, "1575187199999", null);
+ doVerifyDataset(new State(state), datasetDash, null, null);
+
+ // CASE: completeness is enabld
+
state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_ENABLED,
true);
+ doVerifyDataset(new State(state), dataset1201, null, null);
+ doVerifyDataset(new State(state), dataset1130, "1575187199999",
"1575187199999");
+ doVerifyDataset(new State(state), datasetDash, null, null);
+ }
+
+ private void doVerifyDataset(State state, FileSystemDataset dataset, String
compactionWatermark, String completionAndCompactionWatermark) {
+ CompactionWatermarkChecker checker = new CompactionWatermarkChecker(state);
+ checker.verify(dataset);
+
Assert.assertEquals(state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK),
compactionWatermark);
+
Assert.assertEquals(state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK),
+ completionAndCompactionWatermark);
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
index 8a707cc..14dbba0 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java
@@ -174,7 +174,14 @@ public class TimePartitionGlobFinder implements
DatasetsFinder<FileSystemDataset
// All 12 months
return new StringBuilder("*");
}
- StringBuilder monthOptions = new StringBuilder("{" + startMonth);
+
+ // Append start month
+ StringBuilder monthOptions = new StringBuilder("{");
+ if (startMonth < 10) {
+ monthOptions.append("0");
+ }
+ monthOptions.append(startMonth);
+
if (endMonth >= startMonth) {
appendOptions(monthOptions, startMonth + 1, endMonth);
} else {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
index 4646029..97eb940 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
@@ -186,6 +186,13 @@ public class TimePartitionedGlobFinderTest {
"{2018,2019}/{11,12,01}/*/*");
Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, "yyyy-MM-dd-HH"),
"{2018,2019}-{11,12,01}*");
+
+ // 2019/1/1 - 2019/1/3
+ start = start.withYear(2019).withMonth(1).withDayOfMonth(1);
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, slashTimeFormat),
+ "{2019}/{01}/*");
+ Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start,
end, dashTimeFormat),
+ "{2019}-{01}*");
}
@Test