This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de3f7e  [GOBBLIN-1011] adjust compaction flow to work with virtual 
partition
1de3f7e is described below

commit 1de3f7e091f85fe47cdac9f554ca8217f0a028c0
Author: zhchen <zhc...@linkedin.com>
AuthorDate: Mon Dec 23 18:22:14 2019 -0800

    [GOBBLIN-1011] adjust compaction flow to work with virtual partition
    
    Closes #2856 from zxcware/comp2
---
 .../apache/gobblin/dataset/FileSystemDataset.java  |  6 ++
 .../CompactionCompleteFileOperationAction.java     |  5 ++
 .../action/CompactionHiveRegistrationAction.java   |  5 ++
 .../action/CompactionMarkDirectoryAction.java      |  5 ++
 .../compaction/mapreduce/MRCompactionTask.java     |  9 +++
 .../compaction/suite/CompactionSuiteBase.java      | 49 ++++++++------
 .../suite/CompactionSuiteBaseFactory.java          |  2 +-
 .../verify/CompactionAuditCountVerifier.java       | 29 ++++++--
 .../verify/CompactionThresholdVerifier.java        |  6 +-
 .../mapreduce/AvroCompactionTaskTest.java          | 37 +++++++++++
 .../dataset/SimpleFileSystemDataset.java           | 12 ++--
 .../java/org/apache/gobblin/time/TimeIterator.java | 49 ++++++++++----
 .../dataset/TimePartitionedGlobFinderTest.java     |  8 +--
 .../org/apache/gobblin/time/TimeIteratorTest.java  | 77 ++++++++++++++++++++++
 .../hive/metastore/HiveMetaStoreBasedRegister.java | 13 +++-
 .../gobblin/hive/metastore/HiveMetaStoreUtils.java |  2 +-
 .../org/apache/gobblin/runtime/TaskExecutor.java   |  3 +-
 17 files changed, 262 insertions(+), 55 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
index 2c5051c..5129fc7 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/FileSystemDataset.java
@@ -27,4 +27,10 @@ public interface FileSystemDataset extends Dataset {
 
   public Path datasetRoot();
 
+  /**
+   * @return true if the dataset doesn't have a physical file/folder
+   */
+  default boolean isVirtual() {
+    return false;
+  }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index e4fb747..02e0578 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -33,6 +33,7 @@ import 
org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.HadoopUtils;
@@ -73,6 +74,10 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
    * Create a record count file containing the number of records that have 
been processed .
    */
   public void onCompactionJobComplete (FileSystemDataset dataset) throws 
IOException {
+    if (dataset.isVirtual()) {
+      return;
+    }
+
     if (configurator != null && configurator.isJobCreated()) {
       CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
       Path tmpPath = configurator.getMrOutputPath();
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index a7536d3..b1a4faa 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -34,6 +34,7 @@ import 
org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
@@ -57,6 +58,10 @@ public class CompactionHiveRegistrationAction implements 
CompactionCompleteActio
   }
 
   public void onCompactionJobComplete(FileSystemDataset dataset) throws 
IOException {
+    if (dataset.isVirtual()) {
+      return;
+    }
+
     if (state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
       HiveRegister hiveRegister = HiveRegister.get(state);
       HiveRegistrationPolicy hiveRegistrationPolicy = 
HiveRegistrationPolicyBase.getPolicy(state);
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
index ac1f1d7..4f10cba 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java
@@ -36,6 +36,7 @@ import 
org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 
@@ -57,6 +58,10 @@ public class CompactionMarkDirectoryAction implements 
CompactionCompleteAction<F
   }
 
   public void onCompactionJobComplete (FileSystemDataset dataset) throws 
IOException {
+    if (dataset.isVirtual()) {
+      return;
+    }
+
     boolean renamingRequired = 
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
             MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 78ed1c2..3270819 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -32,7 +32,9 @@ import 
org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
 import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.mapreduce.MRTask;
@@ -79,6 +81,13 @@ public class MRCompactionTask extends MRTask {
       }
     }
 
+    if (dataset instanceof FileSystemDataset
+        && ((FileSystemDataset)dataset).isVirtual()) {
+      log.info("A trivial compaction job as there is no physical data. Will 
trigger a success complete directly");
+      this.onMRTaskComplete(true, null);
+      return;
+    }
+
     super.run();
   }
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
index a263e0c..6aa0c53 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
@@ -23,9 +23,11 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
-import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.mapreduce.Job;
 
+import com.google.gson.Gson;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
@@ -36,8 +38,11 @@ import 
org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
 import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
 
 /**
  * A type of {@link CompactionSuite} which implements all components needed 
for file compaction.
@@ -45,9 +50,15 @@ import org.apache.gobblin.dataset.FileSystemDataset;
  */
 @Slf4j
 public class CompactionSuiteBase implements CompactionSuite<FileSystemDataset> 
{
-  public static final String SERIALIZE_COMPACTION_FILE_PATH_NAME = 
"compaction-file-path-name";
-  private State state;
-  private CompactionJobConfigurator configurator = null;
+
+  protected State state;
+  /**
+   * Require lazy evaluation for now to support feature in
+   * {@link 
org.apache.gobblin.compaction.source.CompactionSource#optionalInit(SourceState)}
+   */
+  private CompactionJobConfigurator configurator;
+  private static final Gson GSON = 
GsonInterfaceAdapter.getGson(FileSystemDataset.class);
+  private static final String SERIALIZED_DATASET = 
"compaction.serializedDataset";
 
   /**
    * Constructor
@@ -85,7 +96,7 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    * @param state   A state that is used to save {@link 
org.apache.gobblin.dataset.Dataset}
    */
   public void save (FileSystemDataset dataset, State state) {
-    state.setProp(SERIALIZE_COMPACTION_FILE_PATH_NAME, dataset.datasetURN());
+    state.setProp(SERIALIZED_DATASET, GSON.toJson(dataset));
   }
 
   /**
@@ -95,17 +106,7 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    * @return A new instance of {@link FileSystemDataset}
    */
   public FileSystemDataset load (final State state) {
-    return new FileSystemDataset() {
-      @Override
-      public Path datasetRoot() {
-        return new Path(state.getProp(SERIALIZE_COMPACTION_FILE_PATH_NAME));
-      }
-
-      @Override
-      public String datasetURN() {
-        return state.getProp(SERIALIZE_COMPACTION_FILE_PATH_NAME);
-      }
-    };
+    return GSON.fromJson(state.getProp(SERIALIZED_DATASET), 
FileSystemDataset.class);
   }
 
   /**
@@ -116,9 +117,9 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    */
   public List<CompactionCompleteAction<FileSystemDataset>> 
getCompactionCompleteActions() {
     ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new 
ArrayList<>();
-    array.add(new CompactionCompleteFileOperationAction(state, configurator));
+    array.add(new CompactionCompleteFileOperationAction(state, 
getConfigurator()));
     array.add(new CompactionHiveRegistrationAction(state));
-    array.add(new CompactionMarkDirectoryAction(state, configurator));
+    array.add(new CompactionMarkDirectoryAction(state, getConfigurator()));
     return array;
   }
 
@@ -130,7 +131,15 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    * @return a map-reduce job which will compact files against {@link 
org.apache.gobblin.dataset.Dataset}
    */
   public Job createJob (FileSystemDataset dataset) throws IOException {
-    configurator = 
CompactionJobConfigurator.instantiateConfigurator(this.state);
-    return configurator.createJob(dataset);
+    return getConfigurator().createJob(dataset);
+  }
+
+  protected CompactionJobConfigurator getConfigurator() {
+    if (configurator == null) {
+      synchronized(this) {
+        configurator = 
CompactionJobConfigurator.instantiateConfigurator(this.state);
+      }
+    }
+    return configurator;
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
index 827fe28..3bada2c 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseFactory.java
@@ -28,4 +28,4 @@ public class CompactionSuiteBaseFactory implements 
CompactionSuiteFactory {
   public CompactionSuiteBase createSuite (State state) {
     return new CompactionSuiteBase(state);
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index 7c417df..67039e9 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -18,6 +18,9 @@
 package org.apache.gobblin.compaction.verify;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -35,6 +38,7 @@ import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.time.TimeIterator;
 import org.apache.gobblin.util.ClassAliasResolver;
 
 /**
@@ -46,6 +50,8 @@ import org.apache.gobblin.util.ClassAliasResolver;
 public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSystemDataset> {
 
   public static final String COMPACTION_COMPLETENESS_THRESHOLD = 
MRCompactor.COMPACTION_PREFIX + "completeness.threshold";
+  public static final String COMPACTION_COMMPLETENESS_ENABLED = 
MRCompactor.COMPACTION_PREFIX + "completeness.enabled";
+  public static final String COMPACTION_COMMPLETENESS_GRANULARITY = 
MRCompactor.COMPACTION_PREFIX + "completeness.granularity";
   public static final double DEFAULT_COMPACTION_COMPLETENESS_THRESHOLD = 0.99;
   public static final String PRODUCER_TIER = "producer.tier";
   public static final String ORIGIN_TIER = "origin.tier";
@@ -56,9 +62,13 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
   private  String producerTier;
   private  String gobblinTier;
   private  double threshold;
-  private final State state;
+  protected final State state;
   private final AuditCountClient auditCountClient;
 
+  protected final boolean enabled;
+  protected final TimeIterator.Granularity granularity;
+  protected final ZoneId zone;
+
   /**
    * Constructor with default audit count client
    */
@@ -72,6 +82,10 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
   public CompactionAuditCountVerifier (State state, AuditCountClient client) {
     this.auditCountClient = client;
     this.state = state;
+    this.enabled = state.getPropAsBoolean(COMPACTION_COMMPLETENESS_ENABLED, 
true);
+    this.granularity = TimeIterator.Granularity.valueOf(
+        state.getProp(COMPACTION_COMMPLETENESS_GRANULARITY, "HOUR"));
+    this.zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE, 
MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
 
     // retrieve all tiers information
     if (client != null) {
@@ -93,7 +107,6 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
    *         returned which creates a <code>null</code> {@link 
AuditCountClient}
    */
   private static AuditCountClientFactory getClientFactory (State state) {
-
     if (!state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY)) {
       return new EmptyAuditCountClientFactory ();
     }
@@ -118,17 +131,21 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
    * @return If verification is succeeded
    */
   public Result verify (FileSystemDataset dataset) {
+    if (!enabled) {
+      return new Result(true, "");
+    }
     if (auditCountClient == null) {
       log.debug("No audit count client specified, skipped");
       return new Result(true, "");
     }
 
-    CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(this.state).parse(dataset);
-    DateTime startTime = result.getTime();
-    DateTime endTime = startTime.plusHours(1);
+    CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
+    ZonedDateTime startTime = 
ZonedDateTime.ofInstant(Instant.ofEpochMilli(result.getTime().getMillis()), 
zone);
+    ZonedDateTime endTime = TimeIterator.inc(startTime, granularity, 1);
     String datasetName = result.getDatasetName();
     try {
-      Map<String, Long> countsByTier = auditCountClient.fetch (datasetName, 
startTime.getMillis(), endTime.getMillis());
+      Map<String, Long> countsByTier = auditCountClient.fetch(datasetName,
+          startTime.toInstant().toEpochMilli(), 
endTime.toInstant().toEpochMilli());
       for (String tier: referenceTiers) {
         Result rst = passed (datasetName, countsByTier, tier);
         if (rst.isSuccessful()) {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 03ab36d..0eed686 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -32,6 +32,7 @@ import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRati
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 
 /**
@@ -74,7 +75,10 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
 
     InputRecordCountHelper helper = new InputRecordCountHelper(state);
     try {
-      double newRecords = helper.calculateRecordCount (Lists.newArrayList(new 
Path(dataset.datasetURN())));
+      double newRecords = 0;
+      if (!dataset.isVirtual()) {
+        newRecords = helper.calculateRecordCount (Lists.newArrayList(new 
Path(dataset.datasetURN())));
+      }
       double oldRecords = helper.readRecordCount (new 
Path(result.getDstAbsoluteDir()));
 
       if (oldRecords == 0) {
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index a39658e..19d01fa 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -47,7 +47,9 @@ import 
org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
 import 
org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
+import org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder;
 import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
 import org.apache.gobblin.runtime.api.JobExecutionResult;
 import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
@@ -104,6 +106,41 @@ public class AvroCompactionTaskTest {
   }
 
   @Test
+  public void testCompactVirtualDataset() throws Exception {
+
+    File basePath = Files.createTempDir();
+    basePath.deleteOnExit();
+
+    File jobDir = new File(basePath, "PageViewEvent");
+    Assert.assertTrue(jobDir.mkdirs());
+
+    String pattern = new Path(basePath.getAbsolutePath(), "*").toString();
+    String jobName = "compaction-virtual";
+
+    EmbeddedGobblin embeddedGobblin = new EmbeddedGobblin(jobName)
+        .setConfiguration(ConfigurationKeys.SOURCE_CLASS_KEY, 
CompactionSource.class.getName())
+        
.setConfiguration(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY, 
pattern)
+        .setConfiguration(MRCompactor.COMPACTION_INPUT_DIR, 
basePath.toString())
+        .setConfiguration(MRCompactor.COMPACTION_INPUT_SUBDIR, "hourly")
+        .setConfiguration(MRCompactor.COMPACTION_DEST_DIR, basePath.toString())
+        .setConfiguration(MRCompactor.COMPACTION_DEST_SUBDIR, "daily")
+        .setConfiguration(MRCompactor.COMPACTION_TMP_DEST_DIR, 
"/tmp/compaction/" + jobName)
+        
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 "3d")
+        
.setConfiguration(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 "1d")
+        .setConfiguration(ConfigurationKeys.MAX_TASK_RETRIES_KEY, "0")
+        .setConfiguration(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
+            
"org.apache.gobblin.data.management.dataset.TimePartitionGlobFinder")
+        .setConfiguration(TimePartitionGlobFinder.PARTITION_PREFIX, "hourly/")
+        .setConfiguration(TimePartitionGlobFinder.TIME_FORMAT, "yyyy/MM/dd")
+        .setConfiguration(TimePartitionGlobFinder.GRANULARITY, "DAY")
+        .setConfiguration(TimePartitionGlobFinder.LOOKBACK_SPEC, "P3D")
+        .setConfiguration(TimePartitionGlobFinder.ENABLE_VIRTUAL_PARTITION, 
"true");
+
+    JobExecutionResult result = embeddedGobblin.run();
+    Assert.assertTrue(result.isSuccessful());
+  }
+
+  @Test
   public void testRecompaction () throws Exception {
     FileSystem fs = getFileSystem();
     String basePath = "/tmp/testRecompaction";
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
index 338596c..e1822b8 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/SimpleFileSystemDataset.java
@@ -29,7 +29,7 @@ import org.apache.gobblin.dataset.FileSystemDataset;
 public class SimpleFileSystemDataset implements FileSystemDataset {
 
   private final Path path;
-  private final boolean isVirtual;
+  private final boolean _isVirtual;
 
   public SimpleFileSystemDataset(Path path) {
     this(path, false);
@@ -37,7 +37,7 @@ public class SimpleFileSystemDataset implements 
FileSystemDataset {
 
   public SimpleFileSystemDataset(Path path, boolean isVirtual) {
     this.path = path;
-    this.isVirtual = isVirtual;
+    _isVirtual = isVirtual;
   }
 
   @Override
@@ -50,10 +50,8 @@ public class SimpleFileSystemDataset implements 
FileSystemDataset {
     return path.toString();
   }
 
-  /**
-   * @return true if the dataset doesn't have a physical file/folder
-   */
-  public boolean getIsVirtual() {
-    return isVirtual;
+  @Override
+  public boolean isVirtual() {
+    return _isVirtual;
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
index 4630a7a..a9ef4d4 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.time;
 
 import java.time.ZonedDateTime;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 
 /**
@@ -48,28 +49,50 @@ public class TimeIterator implements Iterator {
 
   @Override
   public ZonedDateTime next() {
+    if (startTime.isAfter(endTime)) {
+      throw new NoSuchElementException();
+    }
     ZonedDateTime dateTime = startTime;
+    startTime = inc(startTime, granularity, 1);
+    return dateTime;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
 
+  /**
+   * Increase the given time by {@code units}, which must be positive, of 
{@code granularity}
+   */
+  public static ZonedDateTime inc(ZonedDateTime time, Granularity granularity, 
long units) {
     switch (granularity) {
       case MINUTE:
-        startTime = startTime.plusMinutes(1);
-        break;
+        return time.plusMinutes(units);
       case HOUR:
-        startTime = startTime.plusHours(1);
-        break;
+        return time.plusHours(units);
       case DAY:
-        startTime = startTime.plusDays(1);
-        break;
+        return time.plusDays(units);
       case MONTH:
-        startTime = startTime.plusMonths(1);
-        break;
+        return time.plusMonths(units);
     }
-
-    return dateTime;
+    throw new RuntimeException("Unsupported granularity: " + granularity);
   }
 
-  @Override
-  public void remove() {
-    throw new UnsupportedOperationException();
+  /**
+   * Decrease the given time by {@code units}, which must be positive, of 
{@code granularity}
+   */
+  public static ZonedDateTime dec(ZonedDateTime time, Granularity granularity, 
long units) {
+    switch (granularity) {
+      case MINUTE:
+        return time.minusMinutes(units);
+      case HOUR:
+        return time.minusHours(units);
+      case DAY:
+        return time.minusDays(units);
+      case MONTH:
+        return time.minusMonths(units);
+    }
+    throw new RuntimeException("Unsupported granularity: " + granularity);
   }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
index fca8bb3..4646029 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TimePartitionedGlobFinderTest.java
@@ -97,11 +97,11 @@ public class TimePartitionedGlobFinderTest {
     datasets = finder.findDatasets();
     Assert.assertEquals(datasets.size(), 6);
     // Verify virtual partitions for /db1/table1
-    Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat), 
datasets).getIsVirtual());
-    Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat), 
datasets).getIsVirtual());
+    Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -1, dayFormat), 
datasets).isVirtual());
+    Assert.assertTrue(find(getPartitionPath(ds1, hourlyPrefix, -2, dayFormat), 
datasets).isVirtual());
     // Verify virtual partitions for /db2/table2
-    Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat), 
datasets).getIsVirtual());
-    Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat), 
datasets).getIsVirtual());
+    Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, 0, dayFormat), 
datasets).isVirtual());
+    Assert.assertTrue(find(getPartitionPath(ds2, hourlyPrefix, -2, dayFormat), 
datasets).isVirtual());
   }
 
   private Path getPartitionPath(Path dataset, String prefix, int dayOffset, 
String format) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
new file mode 100644
index 0000000..50c4e08
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.time;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Unit tests for {@link TimeIterator}
+ */
+public class TimeIteratorTest {
+
+  private ZoneId zone = ZoneId.of("America/Los_Angeles");
+
+  /**
+   * A representative unit test to cover iterating. Actual computations are 
covered by {@link #testInc()}
+   */
+  @Test
+  public void testIterator() {
+    ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11,
+        20,30, 0, zone);
+    ZonedDateTime endTime = startTime.plusDays(12);
+    TimeIterator iterator = new TimeIterator(startTime, endTime, 
TimeIterator.Granularity.DAY);
+    int days = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(iterator.next(), startTime.plusDays(days++));
+    }
+    Assert.assertEquals(days, 13);
+  }
+
+  @Test
+  public void testInc() {
+    ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11,
+        20,30, 0, zone);
+    Assert.assertEquals(TimeIterator.inc(startTime, 
TimeIterator.Granularity.MINUTE, 40).toString(),
+        "2019-12-20T12:00:30-08:00[America/Los_Angeles]");
+    Assert.assertEquals(TimeIterator.inc(startTime, 
TimeIterator.Granularity.HOUR, 13).toString(),
+        "2019-12-21T00:20:30-08:00[America/Los_Angeles]");
+    Assert.assertEquals(TimeIterator.inc(startTime, 
TimeIterator.Granularity.DAY, 12).toString(),
+        "2020-01-01T11:20:30-08:00[America/Los_Angeles]");
+    Assert.assertEquals(TimeIterator.inc(startTime, 
TimeIterator.Granularity.MONTH, 1).toString(),
+        "2020-01-20T11:20:30-08:00[America/Los_Angeles]");
+  }
+
+  @Test
+  public void testDec() {
+    ZonedDateTime startTime = ZonedDateTime.of(2019,12,20,11,
+        20,30, 0, zone);
+    Assert.assertEquals(TimeIterator.dec(startTime, 
TimeIterator.Granularity.MINUTE, 21).toString(),
+        "2019-12-20T10:59:30-08:00[America/Los_Angeles]");
+    Assert.assertEquals(TimeIterator.dec(startTime, 
TimeIterator.Granularity.HOUR, 12).toString(),
+        "2019-12-19T23:20:30-08:00[America/Los_Angeles]");
+    Assert.assertEquals(TimeIterator.dec(startTime, 
TimeIterator.Granularity.DAY, 20).toString(),
+        "2019-11-30T11:20:30-08:00[America/Los_Angeles]");
+    Assert.assertEquals(TimeIterator.dec(startTime, 
TimeIterator.Granularity.MONTH, 12).toString(),
+        "2018-12-20T11:20:30-08:00[America/Los_Angeles]");
+  }
+}
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index a25efdd..15f5982 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
@@ -473,6 +474,10 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   public void dropPartitionIfExists(String dbName, String tableName, 
List<Column> partitionKeys,
       List<String> partitionValues) throws IOException {
     try (AutoReturnableObject<IMetaStoreClient> client = 
this.clientPool.getClient()) {
+      if (client.get().getPartition(dbName, tableName, partitionValues) == 
null) {
+        // Partition does not exist. Nothing to do
+        return;
+      }
       try (Timer.Context context = 
this.metricContext.timer(DROP_TABLE).time()) {
         client.get().dropPartition(dbName, tableName, partitionValues, false);
       }
@@ -755,6 +760,12 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
    * @param existingTable
    */
   protected Table getNewTblByMergingExistingTblProps(Table newTable, HiveTable 
existingTable) {
-    return getTableWithCreateTime(newTable, existingTable);
+    Table table = getTableWithCreateTime(newTable, existingTable);
+    // Get existing parameters
+    Map<String, String> allParameters = 
HiveMetaStoreUtils.getParameters(existingTable.getProps());
+    // Apply new parameters
+    allParameters.putAll(table.getParameters());
+    table.setParameters(allParameters);
+    return table;
   }
 }
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index 976c9e3..7957e62 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -189,7 +189,7 @@ public class HiveMetaStoreUtils {
     return hivePartition;
   }
 
-  private static Map<String, String> getParameters(State props) {
+  public static Map<String, String> getParameters(State props) {
     Map<String, String> parameters = Maps.newHashMap();
     if (props.contains(RUNTIME_PROPS)) {
       String runtimePropsString = props.getProp(RUNTIME_PROPS);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index b868893..755d972 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -441,9 +441,10 @@ public class TaskExecutor extends AbstractIdleService {
       onStart(startTime);
       try {
         this.underlyingTask.run();
-        successfulTaskCount.mark();;
+        successfulTaskCount.mark();
       } catch (Exception e) {
         failedTaskCount.mark();
+        LOG.error(String.format("Task %s failed", underlyingTask.getTaskId()), 
e);
         throw e;
       } finally {
         runningTaskCount.dec();

Reply via email to