This is an automated email from the ASF dual-hosted git repository. hutran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 1de3f7e [GOBBLIN-1011] adjust compaction flow to work with virtual partition 1de3f7e is described below commit 1de3f7e091f85fe47cdac9f554ca8217f0a028c0 Author: zhchen <zhc...@linkedin.com> AuthorDate: Mon Dec 23 18:22:14 2019 -0800 [GOBBLIN-1011] adjust compaction flow to work with virtual partition Closes #2856 from zxcware/comp2 --- .../apache/gobblin/dataset/FileSystemDataset.java | 6 ++ .../CompactionCompleteFileOperationAction.java | 5 ++ .../action/CompactionHiveRegistrationAction.java | 5 ++ .../action/CompactionMarkDirectoryAction.java | 5 ++ .../compaction/mapreduce/MRCompactionTask.java | 9 +++ .../compaction/suite/CompactionSuiteBase.java | 49 ++++++++------ .../suite/CompactionSuiteBaseFactory.java | 2 +- .../verify/CompactionAuditCountVerifier.java | 29 ++++++-- .../verify/CompactionThresholdVerifier.java | 6 +- .../mapreduce/AvroCompactionTaskTest.java | 37 +++++++++++ .../dataset/SimpleFileSystemDataset.java | 12 ++-- .../java/org/apache/gobblin/time/TimeIterator.java | 49 ++++++++++---- .../dataset/TimePartitionedGlobFinderTest.java | 8 +-- .../org/apache/gobblin/time/TimeIteratorTest.java | 77 ++++++++++++++++++++++ .../hive/metastore/HiveMetaStoreBasedRegister.java | 13 +++- .../gobblin/hive/metastore/HiveMetaStoreUtils.java | 2 +- .../org/apache/gobblin/runtime/TaskExecutor.java | 3 +- 17 files changed, 262 insertions(+), 55 deletions(-) diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java index 2c5051c..5129fc7 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java @@ -27,4 +27,10 @@ public interface FileSystemDataset extends Dataset { public Path datasetRoot(); + /** + * @return true if the dataset doesn't have a physical file/folder + */ + default boolean isVirtual() { + return false; + } } diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index e4fb747..02e0578 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -33,6 +33,7 @@ import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.compaction.verify.InputRecordCountHelper; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.util.HadoopUtils; @@ -73,6 +74,10 @@ public class CompactionCompleteFileOperationAction implements CompactionComplete * Create a record count file containing the number of records that have been processed . */ public void onCompactionJobComplete (FileSystemDataset dataset) throws IOException { + if (dataset.isVirtual()) { + return; + } + if (configurator != null && configurator.isJobCreated()) { CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); Path tmpPath = configurator.getMrOutputPath(); diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java index a7536d3..b1a4faa 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java @@ -34,6 +34,7 @@ import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.hive.HiveRegister; import org.apache.gobblin.hive.policy.HiveRegistrationPolicy; @@ -57,6 +58,10 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio } public void onCompactionJobComplete(FileSystemDataset dataset) throws IOException { + if (dataset.isVirtual()) { + return; + } + if (state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) { HiveRegister hiveRegister = HiveRegister.get(state); HiveRegistrationPolicy hiveRegistrationPolicy = HiveRegistrationPolicyBase.getPolicy(state); diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java index ac1f1d7..4f10cba 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java @@ -36,6 +36,7 @@ import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -57,6 +58,10 @@ public class CompactionMarkDirectoryAction implements CompactionCompleteAction<F } public void onCompactionJobComplete (FileSystemDataset dataset) throws IOException { + if (dataset.isVirtual()) { + return; + } + boolean renamingRequired = this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED, MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED); diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java index 78ed1c2..3270819 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java @@ -32,7 +32,9 @@ import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.suite.CompactionSuite; import org.apache.gobblin.compaction.suite.CompactionSuiteUtils; import org.apache.gobblin.compaction.verify.CompactionVerifier; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.runtime.TaskContext; import org.apache.gobblin.runtime.mapreduce.MRTask; @@ -79,6 +81,13 @@ public class MRCompactionTask extends MRTask { } } + if (dataset instanceof FileSystemDataset + && ((FileSystemDataset)dataset).isVirtual()) { + log.info("A trivial compaction job as there is no physical data. Will trigger a success complete directly"); + this.onMRTaskComplete(true, null); + return; + } + super.run(); } diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java index a263e0c..6aa0c53 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java @@ -23,9 +23,11 @@ import java.util.LinkedList; import java.util.List; import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator; -import org.apache.hadoop.fs.Path; + import org.apache.hadoop.mapreduce.Job; +import com.google.gson.Gson; + import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.compaction.action.CompactionCompleteAction; @@ -36,8 +38,11 @@ import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier; import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier; import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier; import org.apache.gobblin.compaction.verify.CompactionVerifier; +import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.util.io.GsonInterfaceAdapter; + /** * A type of {@link CompactionSuite} which implements all components needed for file compaction. @@ -45,9 +50,15 @@ import org.apache.gobblin.dataset.FileSystemDataset; */ @Slf4j public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> { - public static final String SERIALIZE_COMPACTION_FILE_PATH_NAME = "compaction-file-path-name"; - private State state; - private CompactionJobConfigurator configurator = null; + + protected State state; + /** + * Require lazy evaluation for now to support feature in + * {@link org.apache.gobblin.compaction.source.CompactionSource#optionalInit(SourceState)} + */ + private CompactionJobConfigurator configurator; + private static final Gson GSON = GsonInterfaceAdapter.getGson(FileSystemDataset.class); + private static final String SERIALIZED_DATASET = "compaction.serializedDataset"; /** * Constructor @@ -85,7 +96,7 @@ public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> { * @param state A state that is used to save {@link org.apache.gobblin.dataset.Dataset} */ public void save (FileSystemDataset dataset, State state) { - state.setProp(SERIALIZE_COMPACTION_FILE_PATH_NAME, dataset.datasetURN()); + state.setProp(SERIALIZED_DATASET, GSON.toJson(dataset)); } /** @@ -95,17 +106,7 @@ public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> { * @return A new instance of {@link FileSystemDataset} */ public FileSystemDataset load (final State state) { - return new FileSystemDataset() { - @Override - public Path datasetRoot() { - return new Path(state.getProp(SERIALIZE_COMPACTION_FILE_PATH_NAME)); - } - - @Override - public String datasetURN() { - return state.getProp(SERIALIZE_COMPACTION_FILE_PATH_NAME); - } - }; + return GSON.fromJson(state.getProp(SERIALIZED_DATASET), FileSystemDataset.class); } /** @@ -116,9 +117,9 @@ public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> { */ public List<CompactionCompleteAction<FileSystemDataset>> getCompactionCompleteActions() { ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new ArrayList<>(); - array.add(new CompactionCompleteFileOperationAction(state, configurator)); + array.add(new CompactionCompleteFileOperationAction(state, getConfigurator())); array.add(new CompactionHiveRegistrationAction(state)); - array.add(new CompactionMarkDirectoryAction(state, configurator)); + array.add(new CompactionMarkDirectoryAction(state, getConfigurator())); return array; } @@ -130,7 +131,15 @@ public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> { * @return a map-reduce job which will compact files against {@link org.apache.gobblin.dataset.Dataset} */ public Job createJob (FileSystemDataset dataset) throws IOException { - configurator = CompactionJobConfigurator.instantiateConfigurator(this.state); - return configurator.createJob(dataset); + return getConfigurator().createJob(dataset); + } + + protected CompactionJobConfigurator getConfigurator() { + if (configurator == null) { + synchronized(this) { + configurator = CompactionJobConfigurator.instantiateConfigurator(this.state); + } + } + return configurator; } } diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java index 827fe28..3bada2c 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java @@ -28,4 +28,4 @@ public class CompactionSuiteBaseFactory implements CompactionSuiteFactory { public CompactionSuiteBase createSuite (State state) { return new CompactionSuiteBase(state); } -} +} \ No newline at end of file diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java index 7c417df..67039e9 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java @@ -18,6 +18,9 @@ package org.apache.gobblin.compaction.verify; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -35,6 +38,7 @@ import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.time.TimeIterator; import org.apache.gobblin.util.ClassAliasResolver; /** @@ -46,6 +50,8 @@ import org.apache.gobblin.util.ClassAliasResolver; public class CompactionAuditCountVerifier implements CompactionVerifier<FileSystemDataset> { public static final String COMPACTION_COMPLETENESS_THRESHOLD = MRCompactor.COMPACTION_PREFIX + "completeness.threshold"; + public static final String COMPACTION_COMMPLETENESS_ENABLED = MRCompactor.COMPACTION_PREFIX + "completeness.enabled"; + public static final String COMPACTION_COMMPLETENESS_GRANULARITY = MRCompactor.COMPACTION_PREFIX + "completeness.granularity"; public static final double DEFAULT_COMPACTION_COMPLETENESS_THRESHOLD = 0.99; public static final String PRODUCER_TIER = "producer.tier"; public static final String ORIGIN_TIER = "origin.tier"; @@ -56,9 +62,13 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst private String producerTier; private String gobblinTier; private double threshold; - private final State state; + protected final State state; private final AuditCountClient auditCountClient; + protected final boolean enabled; + protected final TimeIterator.Granularity granularity; + protected final ZoneId zone; + /** * Constructor with default audit count client */ @@ -72,6 +82,10 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst public CompactionAuditCountVerifier (State state, AuditCountClient client) { this.auditCountClient = client; this.state = state; + this.enabled = state.getPropAsBoolean(COMPACTION_COMMPLETENESS_ENABLED, true); + this.granularity = TimeIterator.Granularity.valueOf( + state.getProp(COMPACTION_COMMPLETENESS_GRANULARITY, "HOUR")); + this.zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE)); // retrieve all tiers information if (client != null) { @@ -93,7 +107,6 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst * returned which creates a <code>null</code> {@link AuditCountClient} */ private static AuditCountClientFactory getClientFactory (State state) { - if (!state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY)) { return new EmptyAuditCountClientFactory (); } @@ -118,17 +131,21 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst * @return If verification is succeeded */ public Result verify (FileSystemDataset dataset) { + if (!enabled) { + return new Result(true, ""); + } if (auditCountClient == null) { log.debug("No audit count client specified, skipped"); return new Result(true, ""); } - CompactionPathParser.CompactionParserResult result = new CompactionPathParser(this.state).parse(dataset); - DateTime startTime = result.getTime(); - DateTime endTime = startTime.plusHours(1); + CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); + ZonedDateTime startTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(result.getTime().getMillis()), zone); + ZonedDateTime endTime = TimeIterator.inc(startTime, granularity, 1); String datasetName = result.getDatasetName(); try { - Map<String, Long> countsByTier = auditCountClient.fetch (datasetName, startTime.getMillis(), endTime.getMillis()); + Map<String, Long> countsByTier = auditCountClient.fetch(datasetName, + startTime.toInstant().toEpochMilli(), endTime.toInstant().toEpochMilli()); for (String tier: referenceTiers) { Result rst = passed (datasetName, countsByTier, tier); if (rst.isSuccessful()) { diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java index 03ab36d..0eed686 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java @@ -32,6 +32,7 @@ import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRati import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset; import org.apache.gobblin.dataset.FileSystemDataset; /** @@ -74,7 +75,10 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste InputRecordCountHelper helper = new InputRecordCountHelper(state); try { - double newRecords = helper.calculateRecordCount (Lists.newArrayList(new Path(dataset.datasetURN()))); + double newRecords = 0; + if (!dataset.isVirtual()) { + newRecords = helper.calculateRecordCount (Lists.newArrayList(new Path(dataset.datasetURN()))); + } double oldRecords = helper.readRecordCount (new Path(result.getDstAbsoluteDir())); if (oldRecords == 0) { diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java index a39658e..19d01fa 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java @@ -47,7 +47,9 @@ import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier; import org.apache.gobblin.compaction.verify.CompactionVerifier; import org.apache.gobblin.compaction.verify.InputRecordCountHelper; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.data.management.dataset.DatasetUtils; import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer; +import org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder; import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder; import org.apache.gobblin.runtime.api.JobExecutionResult; import org.apache.gobblin.runtime.embedded.EmbeddedGobblin; @@ -104,6 +106,41 @@ public class AvroCompactionTaskTest { } @Test + public void testCompactVirtualDataset() throws Exception { + + File basePath = Files.createTempDir(); + basePath.deleteOnExit(); + + File jobDir = new File(basePath, "PageViewEvent"); + Assert.assertTrue(jobDir.mkdirs()); + + String pattern = new Path(basePath.getAbsolutePath(), "*").toString(); + String jobName = "compaction-virtual"; + + EmbeddedGobblin embeddedGobblin = new EmbeddedGobblin(jobName) + .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, CompactionSource.class.getName()) + .setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY, pattern) + .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR, basePath.toString()) + .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, "hourly") + .setConfiguration(MRCompactor.COMPACTION_DEST_DIR, basePath.toString()) + .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, "daily") + .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR, "/tmp/compaction/" + jobName) + .setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, "3d") + .setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, "1d") + .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0") + .setConfiguration(DatasetUtils.DATASET_PROFILE_CLASS_KEY, + "org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder") + .setConfiguration(TimePartitionGlobFinder.PARTITION_PREFIX, "hourly/") + .setConfiguration(TimePartitionGlobFinder.TIME_FORMAT, "yyyy/MM/dd") + .setConfiguration(TimePartitionGlobFinder.GRANULARITY, "DAY") + .setConfiguration(TimePartitionGlobFinder.LOOKBACK_SPEC, "P3D") + .setConfiguration(TimePartitionGlobFinder.ENABLE_VIRTUAL_PARTITION, "true"); + + JobExecutionResult result = embeddedGobblin.run(); + Assert.assertTrue(result.isSuccessful()); + } + + @Test public void testRecompaction () throws Exception { FileSystem fs = getFileSystem(); String basePath = "/tmp/testRecompaction"; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java index 338596c..e1822b8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java @@ -29,7 +29,7 @@ import org.apache.gobblin.dataset.FileSystemDataset; public class SimpleFileSystemDataset implements FileSystemDataset { private final Path path; - private final boolean isVirtual; + private final boolean _isVirtual; public SimpleFileSystemDataset(Path path) { this(path, false); @@ -37,7 +37,7 @@ public class SimpleFileSystemDataset implements FileSystemDataset { public SimpleFileSystemDataset(Path path, boolean isVirtual) { this.path = path; - this.isVirtual = isVirtual; + _isVirtual = isVirtual; } @Override @@ -50,10 +50,8 @@ public class SimpleFileSystemDataset implements FileSystemDataset { return path.toString(); } - /** - * @return true if the dataset doesn't have a physical file/folder - */ - public boolean getIsVirtual() { - return isVirtual; + @Override + public boolean isVirtual() { + return _isVirtual; } } \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java index 4630a7a..a9ef4d4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java @@ -19,6 +19,7 @@ package org.apache.gobblin.time; import java.time.ZonedDateTime; import java.util.Iterator; +import java.util.NoSuchElementException; /** @@ -48,28 +49,50 @@ public class TimeIterator implements Iterator { @Override public ZonedDateTime next() { + if (startTime.isAfter(endTime)) { + throw new NoSuchElementException(); + } ZonedDateTime dateTime = startTime; + startTime = inc(startTime, granularity, 1); + return dateTime; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + /** + * Increase the given time by {@code units}, which must be positive, of {@code granularity} + */ + public static ZonedDateTime inc(ZonedDateTime time, Granularity granularity, long units) { switch (granularity) { case MINUTE: - startTime = startTime.plusMinutes(1); - break; + return time.plusMinutes(units); case HOUR: - startTime = startTime.plusHours(1); - break; + return time.plusHours(units); case DAY: - startTime = startTime.plusDays(1); - break; + return time.plusDays(units); case MONTH: - startTime = startTime.plusMonths(1); - break; + return time.plusMonths(units); } - - return dateTime; + throw new RuntimeException("Unsupported granularity: " + granularity); } - @Override - public void remove() { - throw new UnsupportedOperationException(); + /** + * Decrease the given time by {@code units}, which must be positive, of {@code granularity} + */ + public static ZonedDateTime dec(ZonedDateTime time, Granularity granularity, long units) { + switch (granularity) { + case MINUTE: + return time.minusMinutes(units); + case HOUR: + return time.minusHours(units); + case DAY: + return time.minusDays(units); + case MONTH: + return time.minusMonths(units); + } + throw new RuntimeException("Unsupported granularity: " + granularity); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java index fca8bb3..4646029 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java @@ -97,11 +97,11 @@ public class TimePartitionedGlobFinderTest { datasets = finder.findDatasets(); Assert.assertEquals(datasets.size(), 6); // Verify virtual partitions for /db1/table1 - Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat), datasets).getIsVirtual()); - Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat), datasets).getIsVirtual()); + Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat), datasets).isVirtual()); + Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat), datasets).isVirtual()); // Verify virtual partitions for /db2/table2 - Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat), datasets).getIsVirtual()); - Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat), datasets).getIsVirtual()); + Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat), datasets).isVirtual()); + Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat), datasets).isVirtual()); } private Path getPartitionPath(Path dataset, String prefix, int dayOffset, String format) { diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java new file mode 100644 index 0000000..50c4e08 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.time; + +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Unit tests for {@link TimeIterator} + */ +public class TimeIteratorTest { + + private ZoneId zone = ZoneId.of("America/Los_Angeles"); + + /** + * A representative unit test to cover iterating. Actual computations are covered by {@link #testInc()} + */ + @Test + public void testIterator() { + ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11, + 20,30, 0, zone); + ZonedDateTime endTime = startTime.plusDays(12); + TimeIterator iterator = new TimeIterator(startTime, endTime, TimeIterator.Granularity.DAY); + int days = 0; + while (iterator.hasNext()) { + Assert.assertEquals(iterator.next(), startTime.plusDays(days++)); + } + Assert.assertEquals(days, 13); + } + + @Test + public void testInc() { + ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11, + 20,30, 0, zone); + Assert.assertEquals(TimeIterator.inc(startTime, TimeIterator.Granularity.MINUTE, 40).toString(), + "2019-12-20T12:00:30-08:00[America/Los_Angeles]"); + Assert.assertEquals(TimeIterator.inc(startTime, TimeIterator.Granularity.HOUR, 13).toString(), + "2019-12-21T00:20:30-08:00[America/Los_Angeles]"); + Assert.assertEquals(TimeIterator.inc(startTime, TimeIterator.Granularity.DAY, 12).toString(), + "2020-01-01T11:20:30-08:00[America/Los_Angeles]"); + Assert.assertEquals(TimeIterator.inc(startTime, TimeIterator.Granularity.MONTH, 1).toString(), + "2020-01-20T11:20:30-08:00[America/Los_Angeles]"); + } + + @Test + public void testDec() { + ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11, + 20,30, 0, zone); + Assert.assertEquals(TimeIterator.dec(startTime, TimeIterator.Granularity.MINUTE, 21).toString(), + "2019-12-20T10:59:30-08:00[America/Los_Angeles]"); + Assert.assertEquals(TimeIterator.dec(startTime, TimeIterator.Granularity.HOUR, 12).toString(), + "2019-12-19T23:20:30-08:00[America/Los_Angeles]"); + Assert.assertEquals(TimeIterator.dec(startTime, TimeIterator.Granularity.DAY, 20).toString(), + "2019-11-30T11:20:30-08:00[America/Los_Angeles]"); + Assert.assertEquals(TimeIterator.dec(startTime, TimeIterator.Granularity.MONTH, 12).toString(), + "2018-12-20T11:20:30-08:00[America/Los_Angeles]"); + } +} diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java index a25efdd..15f5982 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; @@ -473,6 +474,10 @@ public class HiveMetaStoreBasedRegister extends HiveRegister { public void dropPartitionIfExists(String dbName, String tableName, List<Column> partitionKeys, List<String> partitionValues) throws IOException { try (AutoReturnableObject<IMetaStoreClient> client = this.clientPool.getClient()) { + if (client.get().getPartition(dbName, tableName, partitionValues) == null) { + // Partition does not exist. Nothing to do + return; + } try (Timer.Context context = this.metricContext.timer(DROP_TABLE).time()) { client.get().dropPartition(dbName, tableName, partitionValues, false); } @@ -755,6 +760,12 @@ public class HiveMetaStoreBasedRegister extends HiveRegister { * @param existingTable */ protected Table getNewTblByMergingExistingTblProps(Table newTable, HiveTable existingTable) { - return getTableWithCreateTime(newTable, existingTable); + Table table = getTableWithCreateTime(newTable, existingTable); + // Get existing parameters + Map<String, String> allParameters = HiveMetaStoreUtils.getParameters(existingTable.getProps()); + // Apply new parameters + allParameters.putAll(table.getParameters()); + table.setParameters(allParameters); + return table; } } diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java index 976c9e3..7957e62 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java @@ -189,7 +189,7 @@ public class HiveMetaStoreUtils { return hivePartition; } - private static Map<String, String> getParameters(State props) { + public static Map<String, String> getParameters(State props) { Map<String, String> parameters = Maps.newHashMap(); if (props.contains(RUNTIME_PROPS)) { String runtimePropsString = props.getProp(RUNTIME_PROPS); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java index b868893..755d972 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java @@ -441,9 +441,10 @@ public class TaskExecutor extends AbstractIdleService { onStart(startTime); try { this.underlyingTask.run(); - successfulTaskCount.mark();; + successfulTaskCount.mark(); } catch (Exception e) { failedTaskCount.mark(); + LOG.error(String.format("Task %s failed", underlyingTask.getTaskId()), e); throw e; } finally { runningTaskCount.dec();