Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d6f0112a9 -> 6403d247e


[GOBBLIN-473] Allow user to configure different lookback times for compaction

Closes #2345 from yukuai518/cc


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6403d247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6403d247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6403d247

Branch: refs/heads/master
Commit: 6403d247e06ca72ee2757f44788a6f5b8cd85ed0
Parents: d6f0112
Author: Kuai Yu <[email protected]>
Authored: Tue Apr 24 15:14:31 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Apr 24 15:14:31 2018 -0700

----------------------------------------------------------------------
 .../verify/CompactionTimeRangeVerifier.java     |  56 +++++++++-
 .../verify/CompactionTimeVerifierTest.java      | 101 +++++++++++++++++++
 2 files changed, 155 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6403d247/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index 06abd0a..5b49193 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -17,6 +17,10 @@
 
 package org.apache.gobblin.compaction.verify;
 
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -24,6 +28,10 @@ import org.joda.time.Period;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
@@ -57,13 +65,18 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
       PeriodFormatter formatter = new 
PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours()
               .appendSuffix("h").toFormatter();
 
+      // Dataset name is like 'Identity/MemberAccount' or 'PageViewEvent'
+      String datasetName = result.getDatasetName();
+
       // get earliest time
-      String maxTimeAgoStr = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
+      String maxTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
+      String maxTimeAgoStr = getMachedLookbackTime(datasetName, 
maxTimeAgoStrList, 
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
       Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
       earliest = compactionStartTime.minus(maxTimeAgo);
 
       // get latest time
-      String minTimeAgoStr = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
+      String minTimeAgoStrList = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
+      String minTimeAgoStr = getMachedLookbackTime(datasetName, 
minTimeAgoStrList, 
TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
       Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
       latest = compactionStartTime.minus(minTimeAgo);
 
@@ -85,4 +98,43 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
   public boolean isRetriable () {
     return false;
   }
+
+  /**
+   * Find the correct lookback time for a given dataset.
+   *
+   * @param datasetsAndLookBacks Lookback string for multiple datasets. 
Datasets is represented by Regex pattern.
+   *                             Multiple 'datasets and lookback' pairs were 
joined by semi-colon. A default
+   *                             lookback time can be given without any Regex 
prefix. If nothing found, we will use
+   *                             {@param sysDefaultLookback}.
+   *
+   *                             Example Format: 
[Regex1]:[T1];[Regex2]:[T2];[DEFAULT_T];[Regex3]:[T3]
+   *                             Ex. 
Identity.*:1d2h;22h;BizProfile.BizCompany:3h (22h is default lookback time)
+   *
+   * @param sysDefaultLookback If user doesn't specify any lookback time for 
{@param datasetName}, also there is no default
+   *                           lookback time inside {@param 
datasetsAndLookBacks}, this system default lookback time is return.
+   *
+   * @param datasetName A description of dataset without time partition 
information. Example 'Identity/MemberAccount' or 'PageViewEvent'
+   * @return The lookback time matched with given dataset.
+   */
+  public static String getMachedLookbackTime (String datasetName, String 
datasetsAndLookBacks, String sysDefaultLookback) {
+    String defaultLookback = sysDefaultLookback;
+
+    for (String entry : Splitter.on(";").trimResults()
+        .omitEmptyStrings().splitToList(datasetsAndLookBacks)) {
+      List<String> datasetAndLookbackTime = 
Splitter.on(":").trimResults().omitEmptyStrings().splitToList(entry);
+      if (datasetAndLookbackTime.size() == 1) {
+        defaultLookback = datasetAndLookbackTime.get(0);
+      } else if (datasetAndLookbackTime.size() == 2)  {
+        String regex = datasetAndLookbackTime.get(0);
+        if (Pattern.compile(regex).matcher(datasetName).find()) {
+          return datasetAndLookbackTime.get(1);
+        }
+      } else {
+        log.error("Invalid format in {}, {} cannot find its lookback time", 
datasetsAndLookBacks, datasetName);
+      }
+    }
+    return defaultLookback;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6403d247/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
new file mode 100644
index 0000000..654e195
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.verify;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class CompactionTimeVerifierTest {
+
+  @Test
+  public void testOneDatasetTime() {
+    String timeString = "Identity.MemberAccount:1d2h";
+
+    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb1, "1d2h");
+    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    Assert.assertEquals(lb2, "2d");
+
+    timeString = "2d;Identity.MemberAccount:1d2h";
+    String lb3 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb3, "1d2h");
+    String lb4 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    Assert.assertEquals(lb4, "2d");
+  }
+
+  @Test
+  public void testTwoDatasetTime() {
+    String timeString = "Identity.*:1d2h;BizProfile.BizCompany:3d";
+
+    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb1, "1d2h");
+    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    Assert.assertEquals(lb2, "3d");
+    String lb3 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    Assert.assertEquals(lb3, "2d");
+
+    timeString = "2d;Identity.MemberAccount:1d2h;BizProfile.BizCompany:3d";
+    String lb4 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb4, "1d2h");
+    String lb5 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    Assert.assertEquals(lb5, "3d");
+    String lb6 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    Assert.assertEquals(lb6, "2d");
+  }
+
+  @Test
+  public void testDefaultDatasetTime() {
+    String timeString = "Identity.*:1d2h;3d2h;BizProfile.BizCompany:3d";
+    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    Assert.assertEquals(lb1, "3d2h");
+
+    timeString = "3d2h";
+    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    Assert.assertEquals(lb2, "3d2h");
+  }
+
+  @Test
+  public void testEmptySpace() {
+    String timeString = "Identity.* :   1d2h ; BizProfile.BizCompany : 3d";
+
+    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb1, "1d2h");
+    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    Assert.assertEquals(lb2, "3d");
+    String lb3 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    Assert.assertEquals(lb3, "2d");
+
+    timeString = "2d;Identity.MemberAccount  :1d2h;   
BizProfile.BizCompany:3d";
+    String lb4 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb4, "1d2h");
+    String lb5 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", 
timeString, "2d");
+    Assert.assertEquals(lb5, "3d");
+    String lb6 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, 
"2d");
+    Assert.assertEquals(lb6, "2d");
+  }
+
+  @Test
+  public void testPartialMatchedNames() {
+    String timeString = "Identity.Member$ :   1d2h";
+    String lb1 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/Member", 
timeString, "2d");
+    Assert.assertEquals(lb1, "1d2h");
+    String lb2 = 
CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", 
timeString, "2d");
+    Assert.assertEquals(lb2, "2d");
+  }
+}

Reply via email to