This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new d18783d [GOBBLIN-1012] Implement CompactionWithWatermarkSuite d18783d is described below commit d18783df7ab29b161cca59ca907fb47e1366b358 Author: zhchen <zhc...@linkedin.com> AuthorDate: Fri Jan 3 10:59:20 2020 -0800 [GOBBLIN-1012] Implement CompactionWithWatermarkSuite Closes #2857 from zxcware/comp3 --- .../action/CompactionWatermarkAction.java | 167 +++++++++++++++++ .../compaction/mapreduce/MRCompactionTask.java | 5 +- .../suite/CompactionWithWatermarkSuite.java | 83 +++++++++ .../suite/CompactionWithWatermarkSuiteFactory.java | 29 +++ .../verify/CompactionWatermarkChecker.java | 109 +++++++++++ .../action/CompactionWatermarkActionTest.java | 205 +++++++++++++++++++++ .../verify/CompactionWatermarkCheckerTest.java | 88 +++++++++ .../dataset/TimePartitionGlobFinder.java | 9 +- .../dataset/TimePartitionedGlobFinderTest.java | 7 + 9 files changed, 699 insertions(+), 3 deletions(-) diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java new file mode 100644 index 0000000..2c6e1c2 --- /dev/null +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.action; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import org.apache.commons.lang.StringUtils; + +import com.google.common.base.Optional; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.compaction.parser.CompactionPathParser; +import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder; +import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.hive.HiveRegister; +import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.time.TimeIterator; + + +/** + * The class publishes compaction watermarks, reported by {@link CompactionWatermarkChecker}, as hive table parameters. + * It guarantees compaction watermark is updated continuously and errors out if there is a gap, which indicates a + * compaction hole. At the time of writing, one should manually fill the compaction hole and update the existing + * watermarks in hive table parameters to recover automatic watermark publish + */ +@Slf4j +public class CompactionWatermarkAction implements CompactionCompleteAction<FileSystemDataset> { + + public static final String CONF_PREFIX = "compactionWatermarkAction"; + public static final String GRANULARITY = CONF_PREFIX + ".granularity"; + public static final String DEFAULT_HIVE_DB = CONF_PREFIX + ".defaultHiveDb"; + + private EventSubmitter submitter; + private State state; + private final String defaultHiveDb; + private final TimeIterator.Granularity granularity; + + public CompactionWatermarkAction(State state) { + this.state = state; + defaultHiveDb = state.getProp(DEFAULT_HIVE_DB); + granularity = TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase()); + } + + @Override + public void onCompactionJobComplete(FileSystemDataset dataset) + throws IOException { + + String compactionWatermark = state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK); + String completeCompactionWatermark = state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK); + if (StringUtils.isEmpty(compactionWatermark) && StringUtils.isEmpty(completeCompactionWatermark)) { + return; + } + + CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); + HiveDatasetFinder.DbAndTable dbAndTable = extractDbTable(result.getDatasetName()); + String hiveDb = dbAndTable.getDb(); + String hiveTable = dbAndTable.getTable(); + + HiveRegister hiveRegister = HiveRegister.get(state); + Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb, hiveTable); + if (!tableOptional.isPresent()) { + log.info("Table {}.{} not found. Skip publishing compaction watermarks", hiveDb, hiveTable); + return; + } + + HiveTable table = tableOptional.get(); + State tableProps = table.getProps(); + boolean shouldUpdate = + mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark); + if (mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK, + completeCompactionWatermark)) { + shouldUpdate = true; + } + + if (shouldUpdate) { + log.info("Alter table {}.{} to publish watermarks {}", hiveDb, hiveTable, tableProps); + hiveRegister.alterTable(table); + } + } + + /** + * Update watermark if the new one is continuously higher than the existing one + */ + private boolean mayUpdateWatermark(FileSystemDataset dataset, State props, String key, String newValue) { + + if (StringUtils.isEmpty(newValue)) { + return false; + } + + long existing = props.getPropAsLong(key, 0); + if (existing == 0) { + props.setProp(key, newValue); + return true; + } + + long actualNextWatermark = Long.parseLong(newValue); + if (actualNextWatermark <= existing) { + return false; + } + + long expectedWatermark = getExpectedNextWatermark(existing); + if (actualNextWatermark != expectedWatermark) { + String errMsg = String.format( + "Fail to advance %s of dataset %s: expect %s but got %s, please manually fill the gap and rerun.", + key, dataset.datasetRoot(), expectedWatermark, actualNextWatermark); + log.error(errMsg); + throw new RuntimeException(errMsg); + } + + props.setProp(key, newValue); + return true; + } + + /** + * To guarantee watermark continuity, the expected next watermark should be: {@code previousWatermark} + 1 + * unit of {@link #granularity} + */ + private long getExpectedNextWatermark(Long previousWatermark) { + ZonedDateTime previousWatermarkTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(previousWatermark), + ZoneId.systemDefault()); + ZonedDateTime nextWatermarkTime = TimeIterator.inc(previousWatermarkTime, granularity, 1); + return nextWatermarkTime.toInstant().toEpochMilli(); + } + + @Override + public void addEventSubmitter(EventSubmitter submitter) { + this.submitter = submitter; + } + + private HiveDatasetFinder.DbAndTable extractDbTable(String datasetName) { + String[] parts = datasetName.split("/"); + if (parts.length == 0 || parts.length > 2) { + throw new RuntimeException(String.format("Unsupported dataset %s", datasetName)); + } + String hiveDb = defaultHiveDb; + String hiveTable = parts[0]; + // Use the db from the datasetName if it has + if (parts.length == 2) { + hiveDb = parts[0]; + hiveTable = parts[1]; + } + + return new HiveDatasetFinder.DbAndTable(hiveDb, hiveTable); + } +} diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java index 3270819..ceb5108 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java @@ -75,7 +75,7 @@ public class MRCompactionTask extends MRTask { List<CompactionVerifier> verifiers = this.suite.getMapReduceVerifiers(); for (CompactionVerifier verifier : verifiers) { if (!verifier.verify(dataset).isSuccessful()) { - log.error("Verification {} for {} is not passed.", verifier.getName(), dataset.datasetURN()); + log.error("Verification {} for {} is not passed.", verifier.getName(), dataset.getUrn()); this.onMRTaskComplete (false, new IOException("Compaction verification for MR is failed")); return; } @@ -83,7 +83,8 @@ public class MRCompactionTask extends MRTask { if (dataset instanceof FileSystemDataset && ((FileSystemDataset)dataset).isVirtual()) { - log.info("A trivial compaction job as there is no physical data. Will trigger a success complete directly"); + log.info("A trivial compaction job as there is no physical data for {}." + + "Will trigger a success complete directly", dataset.getUrn()); this.onMRTaskComplete(true, null); return; } diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java new file mode 100644 index 0000000..243f394 --- /dev/null +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuite.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.suite; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.gobblin.compaction.action.CompactionCompleteAction; +import org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction; +import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction; +import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction; +import org.apache.gobblin.compaction.action.CompactionWatermarkAction; +import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier; +import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier; +import org.apache.gobblin.compaction.verify.CompactionVerifier; +import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.FileSystemDataset; + + +/** + * Compaction suite with watermark checking and publishing for file system dataset of + * path pattern, [path prefix]/[dataset name]/[partition prefix]/yyyy/MM/[dd/HH/mm], for + * example: + * <ul> + * <li> home/event1/hourly/2019/12/31 </li> + * <li> home/event2/hourly/2019/12/31/10 </li> + * <li> home/dbName/tableName/hourly/2019/12/31 </li> + * </ul> + * + * The watermarks are published to hive metastore + */ +public class CompactionWithWatermarkSuite extends CompactionSuiteBase { + /** + * Constructor + * @param state + */ + public CompactionWithWatermarkSuite(State state) { + super(state); + } + + @Override + public List<CompactionVerifier<FileSystemDataset>> getDatasetsFinderVerifiers() { + List<CompactionVerifier<FileSystemDataset>> list = new LinkedList<>(); + list.add(new CompactionTimeRangeVerifier(state)); + list.add(new CompactionThresholdVerifier(state)); + return list; + } + + @Override + public List<CompactionVerifier<FileSystemDataset>> getMapReduceVerifiers() { + List<CompactionVerifier<FileSystemDataset>> list = new LinkedList<>(); + list.add(new CompactionWatermarkChecker(state)); + return list; + } + + @Override + public List<CompactionCompleteAction<FileSystemDataset>> getCompactionCompleteActions() { + ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new ArrayList<>(); + array.add(new CompactionCompleteFileOperationAction(state, getConfigurator())); + array.add(new CompactionHiveRegistrationAction(state)); + // Publish compaction watermarks right after hive registration + array.add(new CompactionWatermarkAction(state)); + array.add(new CompactionMarkDirectoryAction(state, getConfigurator())); + return array; + } +} diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java new file mode 100644 index 0000000..df479ce --- /dev/null +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionWithWatermarkSuiteFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.suite; + +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.configuration.State; + + +@Alias("CompactionWithWatermarkSuiteFactory") +public class CompactionWithWatermarkSuiteFactory implements CompactionSuiteFactory { + public CompactionWithWatermarkSuite createSuite (State state) { + return new CompactionWithWatermarkSuite(state); + } +} diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java new file mode 100644 index 0000000..febc8b7 --- /dev/null +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionWatermarkChecker.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.verify; + +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; + +import com.google.common.annotations.VisibleForTesting; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.compaction.source.CompactionSource; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.time.TimeIterator; + + +/** + * A {@link CompactionAuditCountVerifier} to report compaction watermarks based on verification + * result + * + * <p> A {@code watermarkTime} is the previous time of {@value CompactionSource#COMPACTION_INIT_TIME}. It + * can be computed in different {@link org.apache.gobblin.time.TimeIterator.Granularity}. For example, if + * compaction init time is 2019/12/01 18:16:00.000, its compaction watermark in minute granularity is the + * last millis of previous minute, 2019/12/01 18:15:59.999, and watermark in day granularity is the last + * millis of previous day, 2019/11/30 23:59:59.999. + * + * <p> The checker will report {@code watermarkTime} in epoc millis as {@value COMPACTION_WATERMARK} + * regardless of audit counts. If audit counts match, it will also report the time in epoc millis + * as {@value COMPLETION_COMPACTION_WATERMARK} + */ +@Slf4j +public class CompactionWatermarkChecker extends CompactionAuditCountVerifier { + + public static final String TIME_FORMAT = "compactionWatermarkChecker.timeFormat"; + public static final String COMPACTION_WATERMARK = "compactionWatermark"; + public static final String COMPLETION_COMPACTION_WATERMARK = "completionAndCompactionWatermark"; + + private final long watermarkTime; + private final String precedingTimeDatasetPartitionName; + + public CompactionWatermarkChecker(State state) { + super(state); + ZonedDateTime compactionTime = ZonedDateTime.ofInstant( + Instant.ofEpochMilli(state.getPropAsLong(CompactionSource.COMPACTION_INIT_TIME)), zone); + ZonedDateTime precedingTime = TimeIterator.dec(compactionTime, granularity, 1); + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern(state.getProp(TIME_FORMAT)); + precedingTimeDatasetPartitionName = timeFormatter.format(precedingTime); + watermarkTime = getWatermarkTimeMillis(compactionTime, granularity); + } + + @VisibleForTesting + static long getWatermarkTimeMillis(ZonedDateTime compactionTime, TimeIterator.Granularity granularity) { + ZonedDateTime startOfMinute = compactionTime.withSecond(0).with(ChronoField.MILLI_OF_SECOND, 0); + ZonedDateTime startOfTimeGranularity = startOfMinute; + switch (granularity) { + case MINUTE: + break; + case HOUR: + startOfTimeGranularity = startOfMinute.withMinute(0); + break; + case DAY: + startOfTimeGranularity = startOfMinute.withHour(0).withMinute(0); + break; + case MONTH: + startOfTimeGranularity = startOfMinute.withDayOfMonth(1).withHour(0).withMinute(0); + break; + } + // The last millis of the start granularity + return startOfTimeGranularity.minus(1, ChronoUnit.MILLIS).toInstant().toEpochMilli(); + } + + @Override + public Result verify(FileSystemDataset dataset) { + Result res = super.verify(dataset); + if (!dataset.datasetRoot().toString().contains(precedingTimeDatasetPartitionName)) { + return res; + } + + // set compaction watermark + this.state.setProp(COMPACTION_WATERMARK, watermarkTime); + if (enabled && res.isSuccessful()) { + log.info("Set dataset {} complete and compaction watermark {}", dataset.datasetRoot(), watermarkTime); + // If it also passed completeness check + this.state.setProp(COMPLETION_COMPACTION_WATERMARK, watermarkTime); + } else { + log.info("Set dataset {} compaction watermark {}", dataset.datasetRoot(), watermarkTime); + } + return res; + } +} diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java new file mode 100644 index 0000000..038268c --- /dev/null +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionWatermarkActionTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.action; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; + +import org.apache.gobblin.compaction.mapreduce.MRCompactor; +import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; +import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.hive.HivePartition; +import org.apache.gobblin.hive.HiveRegister; +import org.apache.gobblin.hive.HiveRegistrationUnit; +import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.spec.HiveSpec; + + +public class CompactionWatermarkActionTest { + + private final String compactionWatermark = "compactionWatermark"; + private final String completionCompactionWatermark = "completionAndCompactionWatermark"; + + @Test + public void testUpdateWatermark() + throws Exception { + doTestUpdateWatermark("tracking","event1", "event1"); + doTestUpdateWatermark("db1","table1", "db1/table1"); + } + + private void doTestUpdateWatermark(String db, String table, String dataset) + throws Exception { + State state = new State(); + String defaultDb = "tracking"; + state.setProp(CompactionWatermarkAction.DEFAULT_HIVE_DB, defaultDb); + + String inputDir = "/data/tracking"; + String inputSubDir = "hourly"; + String destSubDir = "daily"; + String datasetPath = String.format("%s/%s/%s/2019/12/20", inputDir, dataset, inputSubDir); + state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir); + state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir); + state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir); + state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir); + state.setProp(HiveRegister.HIVE_REGISTER_TYPE, MockHiveRegister.class.getName()); + state.setProp(CompactionWatermarkAction.GRANULARITY, "DAY"); + + State tableProps = new State(); + // 2019-12-31 23:59:59.999 + String existingWatermark = "1577836799999"; + tableProps.setProp(compactionWatermark, existingWatermark); + tableProps.setProp(completionCompactionWatermark, existingWatermark); + HiveTable existingTable = new HiveTable.Builder().withDbName(db).withTableName(table) + .withProps(tableProps).build(); + MockHiveRegister.existingTable = existingTable; + + CompactionWatermarkAction action = new CompactionWatermarkAction(state); + FileSystemDataset fsDataset = new SimpleFileSystemDataset(new Path(datasetPath)); + + // Will not update if old watermarks are reported + String actualWatermark = "1577750399999"; // 2019-10-30 23:59:59.999 + doWatermarkTest(action, fsDataset, state, actualWatermark, existingWatermark); + + // Will throw a runtime exception if watermark is not continuous + Exception exception = null; + try { + actualWatermark = "1578009599999"; // 2020-01-01 23:59:59.999 + doWatermarkTest(action, fsDataset, state, actualWatermark, actualWatermark); + } catch (Exception e) { + exception = e; + } + Assert.assertEquals(exception.getMessage(), + String.format("Fail to advance %s of dataset %s: expect 1577923199999 but got %s, " + + "please manually fill the gap and rerun.", + compactionWatermark, fsDataset.datasetRoot(), actualWatermark)); + + // Will update if newer continuous watermarks are reported + actualWatermark = "1577923199999"; // 2020-01-01 23:59:59.999 + doWatermarkTest(action, fsDataset, state, actualWatermark, actualWatermark); + } + + private void doWatermarkTest(CompactionWatermarkAction action, FileSystemDataset fsDataset, + State state, String actualWatermark, String expectedWatermark) + throws Exception { + state.setProp(CompactionWatermarkChecker.COMPACTION_WATERMARK, actualWatermark); + state.setProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK, actualWatermark); + + action.onCompactionJobComplete(fsDataset); + + Assert.assertEquals(MockHiveRegister.existingTable.getProps().getProp(compactionWatermark), + expectedWatermark); + Assert.assertEquals(MockHiveRegister.existingTable.getProps().getProp(completionCompactionWatermark), + expectedWatermark); + } + + public static class MockHiveRegister extends HiveRegister { + + static HiveTable existingTable; + + public MockHiveRegister(State state, Optional<String> uri) { + super(state); + } + + @Override + protected void registerPath(HiveSpec spec) + throws IOException { + + } + + @Override + public boolean createDbIfNotExists(String dbName) + throws IOException { + return false; + } + + @Override + public boolean createTableIfNotExists(HiveTable table) + throws IOException { + return false; + } + + @Override + public boolean addPartitionIfNotExists(HiveTable table, HivePartition partition) + throws IOException { + return false; + } + + @Override + public boolean existsTable(String dbName, String tableName) + throws IOException { + return false; + } + + @Override + public boolean existsPartition(String dbName, String tableName, List<HiveRegistrationUnit.Column> partitionKeys, + List<String> partitionValues) + throws IOException { + return false; + } + + @Override + public void dropTableIfExists(String dbName, String tableName) + throws IOException { + + } + + @Override + public void dropPartitionIfExists(String dbName, String tableName, List<HiveRegistrationUnit.Column> partitionKeys, + List<String> partitionValues) + throws IOException { + + } + + @Override + public Optional<HiveTable> getTable(String dbName, String tableName) + throws IOException { + if (dbName.equals(existingTable.getDbName()) + && tableName.equals(existingTable.getTableName())) { + return Optional.of(existingTable); + } + return Optional.absent(); + } + + @Override + public Optional<HivePartition> getPartition(String dbName, String tableName, + List<HiveRegistrationUnit.Column> partitionKeys, List<String> partitionValues) + throws IOException { + return null; + } + + @Override + public void alterTable(HiveTable table) + throws IOException { + existingTable = table; + } + + @Override + public void alterPartition(HiveTable table, HivePartition partition) + throws IOException { + + } + } +} diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java new file mode 100644 index 0000000..b2c26ee --- /dev/null +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionWatermarkCheckerTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.verify; + +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.compaction.source.CompactionSource; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; +import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.time.TimeIterator; + + +public class CompactionWatermarkCheckerTest { + + private final ZoneId zone = ZoneId.of("America/Los_Angeles"); + + @Test + public void testGetWatermark() { + // 2019/12/01 18:16:00.000 + ZonedDateTime time = ZonedDateTime.of(2019, 12, 1, 18, 16, 0, 0, zone); + // minute watermark is 2019/12/01 18:15:59.999 + Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, TimeIterator.Granularity.MINUTE), + 1575252959999L); + // hour watermark is 2019/12/01 17:59:59.999 + Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, TimeIterator.Granularity.HOUR), + 1575251999999L); + // day watermark is 2019/11/30 23:59:59.999 + Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, TimeIterator.Granularity.DAY), + 1575187199999L); + // month watermark is 2019/11/30 23:59:59.999 + Assert.assertEquals(CompactionWatermarkChecker.getWatermarkTimeMillis(time, TimeIterator.Granularity.MONTH), + 1575187199999L); + } + + @Test + public void testVerify() { + ZonedDateTime time = ZonedDateTime.of(2019, 12, 1, 18, 16, 0, 0, zone); + State state = new State(); + state.setProp(CompactionSource.COMPACTION_INIT_TIME, time.toInstant().toEpochMilli()); + state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_GRANULARITY, "DAY"); + state.setProp(CompactionWatermarkChecker.TIME_FORMAT, "yyyy/MM/dd"); + + FileSystemDataset dataset1201 = new SimpleFileSystemDataset(new Path("/dataset/2019/12/01")); + FileSystemDataset dataset1130 = new SimpleFileSystemDataset(new Path("/dataset/2019/11/30")); + FileSystemDataset datasetDash = new SimpleFileSystemDataset(new Path("/dataset/datepartition=2019-11-30")); + + // CASE: completeness is disabled + state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_ENABLED, false); + doVerifyDataset(new State(state), dataset1201, null, null); + doVerifyDataset(new State(state), dataset1130, "1575187199999", null); + doVerifyDataset(new State(state), datasetDash, null, null); + + // CASE: completeness is enabld + state.setProp(CompactionAuditCountVerifier.COMPACTION_COMMPLETENESS_ENABLED, true); + doVerifyDataset(new State(state), dataset1201, null, null); + doVerifyDataset(new State(state), dataset1130, "1575187199999", "1575187199999"); + doVerifyDataset(new State(state), datasetDash, null, null); + } + + private void doVerifyDataset(State state, FileSystemDataset dataset, String compactionWatermark, String completionAndCompactionWatermark) { + CompactionWatermarkChecker checker = new CompactionWatermarkChecker(state); + checker.verify(dataset); + Assert.assertEquals(state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK), compactionWatermark); + Assert.assertEquals(state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK), + completionAndCompactionWatermark); + } +} diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java index 8a707cc..14dbba0 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/TimePartitionGlobFinder.java @@ -174,7 +174,14 @@ public class TimePartitionGlobFinder implements DatasetsFinder<FileSystemDataset // All 12 months return new StringBuilder("*"); } - StringBuilder monthOptions = new StringBuilder("{" + startMonth); + + // Append start month + StringBuilder monthOptions = new StringBuilder("{"); + if (startMonth < 10) { + monthOptions.append("0"); + } + monthOptions.append(startMonth); + if (endMonth >= startMonth) { appendOptions(monthOptions, startMonth + 1, endMonth); } else { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java index 4646029..97eb940 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java @@ -186,6 +186,13 @@ public class TimePartitionedGlobFinderTest { "{2018,2019}/{11,12,01}/*/*"); Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, "yyyy-MM-dd-HH"), "{2018,2019}-{11,12,01}*"); + + // 2019/1/1 - 2019/1/3 + start = start.withYear(2019).withMonth(1).withDayOfMonth(1); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, slashTimeFormat), + "{2019}/{01}/*"); + Assert.assertEquals(TimePartitionGlobFinder.derivePartitionPattern(start, end, dashTimeFormat), + "{2019}-{01}*"); } @Test