This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1de3f7e [GOBBLIN-1011] adjust compaction flow to work with virtual
partition
1de3f7e is described below
commit 1de3f7e091f85fe47cdac9f554ca8217f0a028c0
Author: zhchen <[email protected]>
AuthorDate: Mon Dec 23 18:22:14 2019 -0800
[GOBBLIN-1011] adjust compaction flow to work with virtual partition
Closes #2856 from zxcware/comp2
---
.../apache/gobblin/dataset/FileSystemDataset.java | 6 ++
.../CompactionCompleteFileOperationAction.java | 5 ++
.../action/CompactionHiveRegistrationAction.java | 5 ++
.../action/CompactionMarkDirectoryAction.java | 5 ++
.../compaction/mapreduce/MRCompactionTask.java | 9 +++
.../compaction/suite/CompactionSuiteBase.java | 49 ++++++++------
.../suite/CompactionSuiteBaseFactory.java | 2 +-
.../verify/CompactionAuditCountVerifier.java | 29 ++++++--
.../verify/CompactionThresholdVerifier.java | 6 +-
.../mapreduce/AvroCompactionTaskTest.java | 37 +++++++++++
.../dataset/SimpleFileSystemDataset.java | 12 ++--
.../java/org/apache/gobblin/time/TimeIterator.java | 49 ++++++++++----
.../dataset/TimePartitionedGlobFinderTest.java | 8 +--
.../org/apache/gobblin/time/TimeIteratorTest.java | 77 ++++++++++++++++++++++
.../hive/metastore/HiveMetaStoreBasedRegister.java | 13 +++-
.../gobblin/hive/metastore/HiveMetaStoreUtils.java | 2 +-
.../org/apache/gobblin/runtime/TaskExecutor.java | 3 +-
17 files changed, 262 insertions(+), 55 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
index 2c5051c..5129fc7 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
@@ -27,4 +27,10 @@ public interface FileSystemDataset extends Dataset {
public Path datasetRoot();
+ /**
+ * @return true if the dataset doesn't have a physical file/folder
+ */
+ default boolean isVirtual() {
+ return false;
+ }
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index e4fb747..02e0578 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -33,6 +33,7 @@ import
org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
@@ -73,6 +74,10 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
* Create a record count file containing the number of records that have
been processed .
*/
public void onCompactionJobComplete (FileSystemDataset dataset) throws
IOException {
+ if (dataset.isVirtual()) {
+ return;
+ }
+
if (configurator != null && configurator.isJobCreated()) {
CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
Path tmpPath = configurator.getMrOutputPath();
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index a7536d3..b1a4faa 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -34,6 +34,7 @@ import
org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
@@ -57,6 +58,10 @@ public class CompactionHiveRegistrationAction implements
CompactionCompleteActio
}
public void onCompactionJobComplete(FileSystemDataset dataset) throws
IOException {
+ if (dataset.isVirtual()) {
+ return;
+ }
+
if (state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
HiveRegister hiveRegister = HiveRegister.get(state);
HiveRegistrationPolicy hiveRegistrationPolicy =
HiveRegistrationPolicyBase.getPolicy(state);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
index ac1f1d7..4f10cba 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
@@ -36,6 +36,7 @@ import
org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -57,6 +58,10 @@ public class CompactionMarkDirectoryAction implements
CompactionCompleteAction<F
}
public void onCompactionJobComplete (FileSystemDataset dataset) throws
IOException {
+ if (dataset.isVirtual()) {
+ return;
+ }
+
boolean renamingRequired =
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 78ed1c2..3270819 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -32,7 +32,9 @@ import
org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.suite.CompactionSuite;
import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.mapreduce.MRTask;
@@ -79,6 +81,13 @@ public class MRCompactionTask extends MRTask {
}
}
+ if (dataset instanceof FileSystemDataset
+ && ((FileSystemDataset)dataset).isVirtual()) {
+ log.info("A trivial compaction job as there is no physical data. Will
trigger a success complete directly");
+ this.onMRTaskComplete(true, null);
+ return;
+ }
+
super.run();
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
index a263e0c..6aa0c53 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
@@ -23,9 +23,11 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
-import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.mapreduce.Job;
+import com.google.gson.Gson;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
@@ -36,8 +38,11 @@ import
org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
/**
* A type of {@link CompactionSuite} which implements all components needed
for file compaction.
@@ -45,9 +50,15 @@ import org.apache.gobblin.dataset.FileSystemDataset;
*/
@Slf4j
public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset>
{
- public static final String SERIALIZE_COMPACTION_FILE_PATH_NAME =
"compaction-file-path-name";
- private State state;
- private CompactionJobConfigurator configurator = null;
+
+ protected State state;
+ /**
+ * Require lazy evaluation for now to support feature in
+ * {@link
org.apache.gobblin.compaction.source.CompactionSource#optionalInit(SourceState)}
+ */
+ private CompactionJobConfigurator configurator;
+ private static final Gson GSON =
GsonInterfaceAdapter.getGson(FileSystemDataset.class);
+ private static final String SERIALIZED_DATASET =
"compaction.serializedDataset";
/**
* Constructor
@@ -85,7 +96,7 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
* @param state A state that is used to save {@link
org.apache.gobblin.dataset.Dataset}
*/
public void save (FileSystemDataset dataset, State state) {
- state.setProp(SERIALIZE_COMPACTION_FILE_PATH_NAME, dataset.datasetURN());
+ state.setProp(SERIALIZED_DATASET, GSON.toJson(dataset));
}
/**
@@ -95,17 +106,7 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
* @return A new instance of {@link FileSystemDataset}
*/
public FileSystemDataset load (final State state) {
- return new FileSystemDataset() {
- @Override
- public Path datasetRoot() {
- return new Path(state.getProp(SERIALIZE_COMPACTION_FILE_PATH_NAME));
- }
-
- @Override
- public String datasetURN() {
- return state.getProp(SERIALIZE_COMPACTION_FILE_PATH_NAME);
- }
- };
+ return GSON.fromJson(state.getProp(SERIALIZED_DATASET),
FileSystemDataset.class);
}
/**
@@ -116,9 +117,9 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
*/
public List<CompactionCompleteAction<FileSystemDataset>>
getCompactionCompleteActions() {
ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new
ArrayList<>();
- array.add(new CompactionCompleteFileOperationAction(state, configurator));
+ array.add(new CompactionCompleteFileOperationAction(state,
getConfigurator()));
array.add(new CompactionHiveRegistrationAction(state));
- array.add(new CompactionMarkDirectoryAction(state, configurator));
+ array.add(new CompactionMarkDirectoryAction(state, getConfigurator()));
return array;
}
@@ -130,7 +131,15 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
* @return a map-reduce job which will compact files against {@link
org.apache.gobblin.dataset.Dataset}
*/
public Job createJob (FileSystemDataset dataset) throws IOException {
- configurator =
CompactionJobConfigurator.instantiateConfigurator(this.state);
- return configurator.createJob(dataset);
+ return getConfigurator().createJob(dataset);
+ }
+
+ protected CompactionJobConfigurator getConfigurator() {
+ if (configurator == null) {
+ synchronized(this) {
+ configurator =
CompactionJobConfigurator.instantiateConfigurator(this.state);
+ }
+ }
+ return configurator;
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
index 827fe28..3bada2c 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
@@ -28,4 +28,4 @@ public class CompactionSuiteBaseFactory implements
CompactionSuiteFactory {
public CompactionSuiteBase createSuite (State state) {
return new CompactionSuiteBase(state);
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index 7c417df..67039e9 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -18,6 +18,9 @@
package org.apache.gobblin.compaction.verify;
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -35,6 +38,7 @@ import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.ClassAliasResolver;
/**
@@ -46,6 +50,8 @@ import org.apache.gobblin.util.ClassAliasResolver;
public class CompactionAuditCountVerifier implements
CompactionVerifier<FileSystemDataset> {
public static final String COMPACTION_COMPLETENESS_THRESHOLD =
MRCompactor.COMPACTION_PREFIX + "completeness.threshold";
+ public static final String COMPACTION_COMMPLETENESS_ENABLED =
MRCompactor.COMPACTION_PREFIX + "completeness.enabled";
+ public static final String COMPACTION_COMMPLETENESS_GRANULARITY =
MRCompactor.COMPACTION_PREFIX + "completeness.granularity";
public static final double DEFAULT_COMPACTION_COMPLETENESS_THRESHOLD = 0.99;
public static final String PRODUCER_TIER = "producer.tier";
public static final String ORIGIN_TIER = "origin.tier";
@@ -56,9 +62,13 @@ public class CompactionAuditCountVerifier implements
CompactionVerifier<FileSyst
private String producerTier;
private String gobblinTier;
private double threshold;
- private final State state;
+ protected final State state;
private final AuditCountClient auditCountClient;
+ protected final boolean enabled;
+ protected final TimeIterator.Granularity granularity;
+ protected final ZoneId zone;
+
/**
* Constructor with default audit count client
*/
@@ -72,6 +82,10 @@ public class CompactionAuditCountVerifier implements
CompactionVerifier<FileSyst
public CompactionAuditCountVerifier (State state, AuditCountClient client) {
this.auditCountClient = client;
this.state = state;
+ this.enabled = state.getPropAsBoolean(COMPACTION_COMMPLETENESS_ENABLED,
true);
+ this.granularity = TimeIterator.Granularity.valueOf(
+ state.getProp(COMPACTION_COMMPLETENESS_GRANULARITY, "HOUR"));
+ this.zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE,
MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
// retrieve all tiers information
if (client != null) {
@@ -93,7 +107,6 @@ public class CompactionAuditCountVerifier implements
CompactionVerifier<FileSyst
* returned which creates a <code>null</code> {@link
AuditCountClient}
*/
private static AuditCountClientFactory getClientFactory (State state) {
-
if (!state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY)) {
return new EmptyAuditCountClientFactory ();
}
@@ -118,17 +131,21 @@ public class CompactionAuditCountVerifier implements
CompactionVerifier<FileSyst
* @return If verification is succeeded
*/
public Result verify (FileSystemDataset dataset) {
+ if (!enabled) {
+ return new Result(true, "");
+ }
if (auditCountClient == null) {
log.debug("No audit count client specified, skipped");
return new Result(true, "");
}
- CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(this.state).parse(dataset);
- DateTime startTime = result.getTime();
- DateTime endTime = startTime.plusHours(1);
+ CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
+ ZonedDateTime startTime =
ZonedDateTime.ofInstant(Instant.ofEpochMilli(result.getTime().getMillis()),
zone);
+ ZonedDateTime endTime = TimeIterator.inc(startTime, granularity, 1);
String datasetName = result.getDatasetName();
try {
- Map<String, Long> countsByTier = auditCountClient.fetch (datasetName,
startTime.getMillis(), endTime.getMillis());
+ Map<String, Long> countsByTier = auditCountClient.fetch(datasetName,
+ startTime.toInstant().toEpochMilli(),
endTime.toInstant().toEpochMilli());
for (String tier: referenceTiers) {
Result rst = passed (datasetName, countsByTier, tier);
if (rst.isSuccessful()) {
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 03ab36d..0eed686 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -32,6 +32,7 @@ import
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRati
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
/**
@@ -74,7 +75,10 @@ public class CompactionThresholdVerifier implements
CompactionVerifier<FileSyste
InputRecordCountHelper helper = new InputRecordCountHelper(state);
try {
- double newRecords = helper.calculateRecordCount (Lists.newArrayList(new
Path(dataset.datasetURN())));
+ double newRecords = 0;
+ if (!dataset.isVirtual()) {
+ newRecords = helper.calculateRecordCount (Lists.newArrayList(new
Path(dataset.datasetURN())));
+ }
double oldRecords = helper.readRecordCount (new
Path(result.getDstAbsoluteDir()));
if (oldRecords == 0) {
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index a39658e..19d01fa 100644
---
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -47,7 +47,9 @@ import
org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
import
org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
+import org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder;
import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
@@ -104,6 +106,41 @@ public class AvroCompactionTaskTest {
}
@Test
+ public void testCompactVirtualDataset() throws Exception {
+
+ File basePath = Files.createTempDir();
+ basePath.deleteOnExit();
+
+ File jobDir = new File(basePath, "PageViewEvent");
+ Assert.assertTrue(jobDir.mkdirs());
+
+ String pattern = new Path(basePath.getAbsolutePath(), "*").toString();
+ String jobName = "compaction-virtual";
+
+ EmbeddedGobblin embeddedGobblin = new EmbeddedGobblin(jobName)
+ .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY,
CompactionSource.class.getName())
+
.setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY,
pattern)
+ .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR,
basePath.toString())
+ .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, "hourly")
+ .setConfiguration(MRCompactor.COMPACTION_DEST_DIR, basePath.toString())
+ .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, "daily")
+ .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR,
"/tmp/compaction/" + jobName)
+
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
"3d")
+
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
"1d")
+ .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0")
+ .setConfiguration(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+
"org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder")
+ .setConfiguration(TimePartitionGlobFinder.PARTITION_PREFIX, "hourly/")
+ .setConfiguration(TimePartitionGlobFinder.TIME_FORMAT, "yyyy/MM/dd")
+ .setConfiguration(TimePartitionGlobFinder.GRANULARITY, "DAY")
+ .setConfiguration(TimePartitionGlobFinder.LOOKBACK_SPEC, "P3D")
+ .setConfiguration(TimePartitionGlobFinder.ENABLE_VIRTUAL_PARTITION,
"true");
+
+ JobExecutionResult result = embeddedGobblin.run();
+ Assert.assertTrue(result.isSuccessful());
+ }
+
+ @Test
public void testRecompaction () throws Exception {
FileSystem fs = getFileSystem();
String basePath = "/tmp/testRecompaction";
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
index 338596c..e1822b8 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
@@ -29,7 +29,7 @@ import org.apache.gobblin.dataset.FileSystemDataset;
public class SimpleFileSystemDataset implements FileSystemDataset {
private final Path path;
- private final boolean isVirtual;
+ private final boolean _isVirtual;
public SimpleFileSystemDataset(Path path) {
this(path, false);
@@ -37,7 +37,7 @@ public class SimpleFileSystemDataset implements
FileSystemDataset {
public SimpleFileSystemDataset(Path path, boolean isVirtual) {
this.path = path;
- this.isVirtual = isVirtual;
+ _isVirtual = isVirtual;
}
@Override
@@ -50,10 +50,8 @@ public class SimpleFileSystemDataset implements
FileSystemDataset {
return path.toString();
}
- /**
- * @return true if the dataset doesn't have a physical file/folder
- */
- public boolean getIsVirtual() {
- return isVirtual;
+ @Override
+ public boolean isVirtual() {
+ return _isVirtual;
}
}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
index 4630a7a..a9ef4d4 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.time;
import java.time.ZonedDateTime;
import java.util.Iterator;
+import java.util.NoSuchElementException;
/**
@@ -48,28 +49,50 @@ public class TimeIterator implements Iterator {
@Override
public ZonedDateTime next() {
+ if (startTime.isAfter(endTime)) {
+ throw new NoSuchElementException();
+ }
ZonedDateTime dateTime = startTime;
+ startTime = inc(startTime, granularity, 1);
+ return dateTime;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ /**
+ * Increase the given time by {@code units}, which must be positive, of
{@code granularity}
+ */
+ public static ZonedDateTime inc(ZonedDateTime time, Granularity granularity,
long units) {
switch (granularity) {
case MINUTE:
- startTime = startTime.plusMinutes(1);
- break;
+ return time.plusMinutes(units);
case HOUR:
- startTime = startTime.plusHours(1);
- break;
+ return time.plusHours(units);
case DAY:
- startTime = startTime.plusDays(1);
- break;
+ return time.plusDays(units);
case MONTH:
- startTime = startTime.plusMonths(1);
- break;
+ return time.plusMonths(units);
}
-
- return dateTime;
+ throw new RuntimeException("Unsupported granularity: " + granularity);
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
+ /**
+ * Decrease the given time by {@code units}, which must be positive, of
{@code granularity}
+ */
+ public static ZonedDateTime dec(ZonedDateTime time, Granularity granularity,
long units) {
+ switch (granularity) {
+ case MINUTE:
+ return time.minusMinutes(units);
+ case HOUR:
+ return time.minusHours(units);
+ case DAY:
+ return time.minusDays(units);
+ case MONTH:
+ return time.minusMonths(units);
+ }
+ throw new RuntimeException("Unsupported granularity: " + granularity);
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
index fca8bb3..4646029 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
@@ -97,11 +97,11 @@ public class TimePartitionedGlobFinderTest {
datasets = finder.findDatasets();
Assert.assertEquals(datasets.size(), 6);
// Verify virtual partitions for /db1/table1
- Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat),
datasets).getIsVirtual());
- Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat),
datasets).getIsVirtual());
+ Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat),
datasets).isVirtual());
+ Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat),
datasets).isVirtual());
// Verify virtual partitions for /db2/table2
- Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat),
datasets).getIsVirtual());
- Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat),
datasets).getIsVirtual());
+ Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat),
datasets).isVirtual());
+ Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat),
datasets).isVirtual());
}
private Path getPartitionPath(Path dataset, String prefix, int dayOffset,
String format) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
new file mode 100644
index 0000000..50c4e08
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.time;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit tests for {@link TimeIterator}
+ */
+public class TimeIteratorTest {
+
+ private ZoneId zone = ZoneId.of("America/Los_Angeles");
+
+ /**
+ * A representative unit test to cover iterating. Actual computations are
covered by {@link #testInc()}
+ */
+ @Test
+ public void testIterator() {
+ ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11,
+ 20,30, 0, zone);
+ ZonedDateTime endTime = startTime.plusDays(12);
+ TimeIterator iterator = new TimeIterator(startTime, endTime,
TimeIterator.Granularity.DAY);
+ int days = 0;
+ while (iterator.hasNext()) {
+ Assert.assertEquals(iterator.next(), startTime.plusDays(days++));
+ }
+ Assert.assertEquals(days, 13);
+ }
+
+ @Test
+ public void testInc() {
+ ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11,
+ 20,30, 0, zone);
+ Assert.assertEquals(TimeIterator.inc(startTime,
TimeIterator.Granularity.MINUTE, 40).toString(),
+ "2019-12-20T12:00:30-08:00[America/Los_Angeles]");
+ Assert.assertEquals(TimeIterator.inc(startTime,
TimeIterator.Granularity.HOUR, 13).toString(),
+ "2019-12-21T00:20:30-08:00[America/Los_Angeles]");
+ Assert.assertEquals(TimeIterator.inc(startTime,
TimeIterator.Granularity.DAY, 12).toString(),
+ "2020-01-01T11:20:30-08:00[America/Los_Angeles]");
+ Assert.assertEquals(TimeIterator.inc(startTime,
TimeIterator.Granularity.MONTH, 1).toString(),
+ "2020-01-20T11:20:30-08:00[America/Los_Angeles]");
+ }
+
+ @Test
+ public void testDec() {
+ ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11,
+ 20,30, 0, zone);
+ Assert.assertEquals(TimeIterator.dec(startTime,
TimeIterator.Granularity.MINUTE, 21).toString(),
+ "2019-12-20T10:59:30-08:00[America/Los_Angeles]");
+ Assert.assertEquals(TimeIterator.dec(startTime,
TimeIterator.Granularity.HOUR, 12).toString(),
+ "2019-12-19T23:20:30-08:00[America/Los_Angeles]");
+ Assert.assertEquals(TimeIterator.dec(startTime,
TimeIterator.Granularity.DAY, 20).toString(),
+ "2019-11-30T11:20:30-08:00[America/Los_Angeles]");
+ Assert.assertEquals(TimeIterator.dec(startTime,
TimeIterator.Granularity.MONTH, 12).toString(),
+ "2018-12-20T11:20:30-08:00[America/Los_Angeles]");
+ }
+}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index a25efdd..15f5982 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
@@ -473,6 +474,10 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
public void dropPartitionIfExists(String dbName, String tableName,
List<Column> partitionKeys,
List<String> partitionValues) throws IOException {
try (AutoReturnableObject<IMetaStoreClient> client =
this.clientPool.getClient()) {
+ if (client.get().getPartition(dbName, tableName, partitionValues) ==
null) {
+ // Partition does not exist. Nothing to do
+ return;
+ }
try (Timer.Context context =
this.metricContext.timer(DROP_TABLE).time()) {
client.get().dropPartition(dbName, tableName, partitionValues, false);
}
@@ -755,6 +760,12 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
* @param existingTable
*/
protected Table getNewTblByMergingExistingTblProps(Table newTable, HiveTable
existingTable) {
- return getTableWithCreateTime(newTable, existingTable);
+ Table table = getTableWithCreateTime(newTable, existingTable);
+ // Get existing parameters
+ Map<String, String> allParameters =
HiveMetaStoreUtils.getParameters(existingTable.getProps());
+ // Apply new parameters
+ allParameters.putAll(table.getParameters());
+ table.setParameters(allParameters);
+ return table;
}
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 976c9e3..7957e62 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -189,7 +189,7 @@ public class HiveMetaStoreUtils {
return hivePartition;
}
- private static Map<String, String> getParameters(State props) {
+ public static Map<String, String> getParameters(State props) {
Map<String, String> parameters = Maps.newHashMap();
if (props.contains(RUNTIME_PROPS)) {
String runtimePropsString = props.getProp(RUNTIME_PROPS);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index b868893..755d972 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -441,9 +441,10 @@ public class TaskExecutor extends AbstractIdleService {
onStart(startTime);
try {
this.underlyingTask.run();
- successfulTaskCount.mark();;
+ successfulTaskCount.mark();
} catch (Exception e) {
failedTaskCount.mark();
+ LOG.error(String.format("Task %s failed", underlyingTask.getTaskId()),
e);
throw e;
} finally {
runningTaskCount.dec();