Repository: incubator-gobblin Updated Branches: refs/heads/master d6f0112a9 -> 6403d247e
[GOBBLIN-473] Allow user to configure different lookback times for compaction Closes #2345 from yukuai518/cc Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6403d247 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6403d247 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6403d247 Branch: refs/heads/master Commit: 6403d247e06ca72ee2757f44788a6f5b8cd85ed0 Parents: d6f0112 Author: Kuai Yu <[email protected]> Authored: Tue Apr 24 15:14:31 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Apr 24 15:14:31 2018 -0700 ---------------------------------------------------------------------- .../verify/CompactionTimeRangeVerifier.java | 56 +++++++++- .../verify/CompactionTimeVerifierTest.java | 101 +++++++++++++++++++ 2 files changed, 155 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6403d247/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java index 06abd0a..5b49193 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java @@ -17,6 +17,10 @@ package org.apache.gobblin.compaction.verify; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + import org.apache.commons.lang.exception.ExceptionUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -24,6 +28,10 @@ import org.joda.time.Period; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; + import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -57,13 +65,18 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste PeriodFormatter formatter = new PeriodFormatterBuilder().appendMonths().appendSuffix("m").appendDays().appendSuffix("d").appendHours() .appendSuffix("h").toFormatter(); + // Dataset name is like 'Identity/MemberAccount' or 'PageViewEvent' + String datasetName = result.getDatasetName(); + // get earliest time - String maxTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); + String maxTimeAgoStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); + String maxTimeAgoStr = getMachedLookbackTime(datasetName, maxTimeAgoStrList, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr); earliest = compactionStartTime.minus(maxTimeAgo); // get latest time - String minTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); + String minTimeAgoStrList = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); + String minTimeAgoStr = getMachedLookbackTime(datasetName, minTimeAgoStrList, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr); latest = compactionStartTime.minus(minTimeAgo); @@ -85,4 +98,43 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste public boolean isRetriable () { return false; } + + /** + * Find the correct lookback time for a given dataset. + * + * @param datasetsAndLookBacks Lookback string for multiple datasets. Datasets is represented by Regex pattern. + * Multiple 'datasets and lookback' pairs were joined by semi-colon. A default + * lookback time can be given without any Regex prefix. If nothing found, we will use + * {@param sysDefaultLookback}. + * + * Example Format: [Regex1]:[T1];[Regex2]:[T2];[DEFAULT_T];[Regex3]:[T3] + * Ex. Identity.*:1d2h;22h;BizProfile.BizCompany:3h (22h is default lookback time) + * + * @param sysDefaultLookback If user doesn't specify any lookback time for {@param datasetName}, also there is no default + * lookback time inside {@param datasetsAndLookBacks}, this system default lookback time is return. + * + * @param datasetName A description of dataset without time partition information. Example 'Identity/MemberAccount' or 'PageViewEvent' + * @return The lookback time matched with given dataset. + */ + public static String getMachedLookbackTime (String datasetName, String datasetsAndLookBacks, String sysDefaultLookback) { + String defaultLookback = sysDefaultLookback; + + for (String entry : Splitter.on(";").trimResults() + .omitEmptyStrings().splitToList(datasetsAndLookBacks)) { + List<String> datasetAndLookbackTime = Splitter.on(":").trimResults().omitEmptyStrings().splitToList(entry); + if (datasetAndLookbackTime.size() == 1) { + defaultLookback = datasetAndLookbackTime.get(0); + } else if (datasetAndLookbackTime.size() == 2) { + String regex = datasetAndLookbackTime.get(0); + if (Pattern.compile(regex).matcher(datasetName).find()) { + return datasetAndLookbackTime.get(1); + } + } else { + log.error("Invalid format in {}, {} cannot find its lookback time", datasetsAndLookBacks, datasetName); + } + } + return defaultLookback; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6403d247/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java new file mode 100644 index 0000000..654e195 --- /dev/null +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/CompactionTimeVerifierTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.compaction.verify; + +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class CompactionTimeVerifierTest { + + @Test + public void testOneDatasetTime() { + String timeString = "Identity.MemberAccount:1d2h"; + + String lb1 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb1, "1d2h"); + String lb2 = CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", timeString, "2d"); + Assert.assertEquals(lb2, "2d"); + + timeString = "2d;Identity.MemberAccount:1d2h"; + String lb3 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb3, "1d2h"); + String lb4 = CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", timeString, "2d"); + Assert.assertEquals(lb4, "2d"); + } + + @Test + public void testTwoDatasetTime() { + String timeString = "Identity.*:1d2h;BizProfile.BizCompany:3d"; + + String lb1 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb1, "1d2h"); + String lb2 = CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", timeString, "2d"); + Assert.assertEquals(lb2, "3d"); + String lb3 = CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, "2d"); + Assert.assertEquals(lb3, "2d"); + + timeString = "2d;Identity.MemberAccount:1d2h;BizProfile.BizCompany:3d"; + String lb4 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb4, "1d2h"); + String lb5 = CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", timeString, "2d"); + Assert.assertEquals(lb5, "3d"); + String lb6 = CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, "2d"); + Assert.assertEquals(lb6, "2d"); + } + + @Test + public void testDefaultDatasetTime() { + String timeString = "Identity.*:1d2h;3d2h;BizProfile.BizCompany:3d"; + String lb1 = CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, "2d"); + Assert.assertEquals(lb1, "3d2h"); + + timeString = "3d2h"; + String lb2 = CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, "2d"); + Assert.assertEquals(lb2, "3d2h"); + } + + @Test + public void testEmptySpace() { + String timeString = "Identity.* : 1d2h ; BizProfile.BizCompany : 3d"; + + String lb1 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb1, "1d2h"); + String lb2 = CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", timeString, "2d"); + Assert.assertEquals(lb2, "3d"); + String lb3 = CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, "2d"); + Assert.assertEquals(lb3, "2d"); + + timeString = "2d;Identity.MemberAccount :1d2h; BizProfile.BizCompany:3d"; + String lb4 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb4, "1d2h"); + String lb5 = CompactionTimeRangeVerifier.getMachedLookbackTime("BizProfile/BizCompany", timeString, "2d"); + Assert.assertEquals(lb5, "3d"); + String lb6 = CompactionTimeRangeVerifier.getMachedLookbackTime("ABC/Unknown", timeString, "2d"); + Assert.assertEquals(lb6, "2d"); + } + + @Test + public void testPartialMatchedNames() { + String timeString = "Identity.Member$ : 1d2h"; + String lb1 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/Member", timeString, "2d"); + Assert.assertEquals(lb1, "1d2h"); + String lb2 = CompactionTimeRangeVerifier.getMachedLookbackTime("Identity/MemberAccount", timeString, "2d"); + Assert.assertEquals(lb2, "2d"); + } +}
