This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 21dd436 [GOBBLIN-1233][GOBBLIN-831] Add case-aware support in
WhitelistBlacklist and other small fixes
21dd436 is described below
commit 21dd43619ba769143f9afa87ca89b37dacd67756
Author: zhchen <[email protected]>
AuthorDate: Fri Aug 14 16:24:41 2020 -0700
[GOBBLIN-1233][GOBBLIN-831] Add case-aware support in WhitelistBlacklist
and other small fixes
Closes #3078 from zxcware/misc
---
.../cluster/GobblinHelixJobLauncherTest.java | 1 +
.../partitioner/TimeBasedWriterPartitioner.java | 3 ++
.../runtime/retention/DatasetCleanerTask.java | 11 +++++
.../copy/hive/WhitelistBlacklistTest.java | 24 +++++++++-
.../org/apache/gobblin/writer/AsyncHttpWriter.java | 2 +-
.../apache/gobblin/writer/AsyncHttpWriterTest.java | 2 +-
.../packer/KafkaBiLevelWorkUnitPacker.java | 4 ++
.../packer/KafkaSingleLevelWorkUnitPacker.java | 4 ++
.../kafka/workunit/packer/KafkaWorkUnitPacker.java | 10 ++--
.../workunit/packer/KafkaWorkUnitPackerTest.java | 16 +++++++
.../management/copy/hive/WhitelistBlacklist.java | 56 +++++++++++++++-------
11 files changed, 110 insertions(+), 23 deletions(-)
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 1f98807..7625540 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -286,6 +286,7 @@ public class GobblinHelixJobLauncherTest {
Assert.assertEquals(testListener.getCompletes().get() == 1, true);
}
+ @Test(dependsOnMethods = {"testLaunchJob", "testLaunchMultipleJobs"})
public void testJobCleanup() throws Exception {
final ConcurrentHashMap<String, Boolean> runningMap = new
ConcurrentHashMap<>();
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
index 5ec2c10..2c84a32 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
@@ -36,6 +36,8 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.DatePartitionType;
@@ -74,6 +76,7 @@ public abstract class TimeBasedWriterPartitioner<D>
implements WriterPartitioner
private final String writerPartitionSuffix;
private final DatePartitionType granularity;
private final DateTimeZone timeZone;
+ @Getter
protected final TimeUnit timeUnit;
private final Optional<DateTimeFormatter> timestampToPathFormatter;
private final Schema schema;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
index b46e17c..4c5cbd2 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.retention.DatasetCleaner;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.task.BaseAbstractTask;
@@ -30,6 +33,7 @@ import org.apache.gobblin.runtime.task.BaseAbstractTask;
/**
* A task that runs a DatasetCleaner job.
*/
+@Slf4j
public class DatasetCleanerTask extends BaseAbstractTask {
private static final String JOB_CONFIGURATION_PREFIX = "datasetCleaner";
@@ -47,8 +51,15 @@ public class DatasetCleanerTask extends BaseAbstractTask {
DatasetCleaner datasetCleaner = new DatasetCleaner(FileSystem.get(new
Configuration()),
this.taskContext.getTaskState().getProperties());
datasetCleaner.clean();
+ this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
} catch (IOException e) {
+ this.workingState = WorkUnitState.WorkingState.FAILED;
throw new RuntimeException(e);
}
}
+
+ @Override
+ public void commit() {
+ log.info("task {} commits with state {}",
this.taskContext.getTaskState().getTaskId(), this.workingState);
+ }
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
index 564cdf2..e5f056a 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
@@ -17,10 +17,15 @@
package org.apache.gobblin.data.management.copy.hive;
+import java.util.Map;
+
import org.testng.annotations.Test;
import org.testng.Assert;
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
public class WhitelistBlacklistTest {
@@ -159,8 +164,25 @@ public class WhitelistBlacklistTest {
}
@Test
+ public void testCaseAware() throws Exception {
+ WhitelistBlacklist whitelistBlacklist = new
WhitelistBlacklist("dB*.Table*", "*.Tablea", false);
+ Assert.assertTrue(whitelistBlacklist.acceptDb("dBa"));
+ Assert.assertTrue(whitelistBlacklist.acceptDb("dBb"));
+ Assert.assertFalse(whitelistBlacklist.acceptDb("dbb"));
+
+ Assert.assertFalse(whitelistBlacklist.acceptTable("dBa", "Tablea"));
+ Assert.assertTrue(whitelistBlacklist.acceptTable("dBa", "Tableb"));
+ Assert.assertFalse(whitelistBlacklist.acceptTable("dbb", "Tableb"));
+ Assert.assertFalse(whitelistBlacklist.acceptTable("dBb", "tableb"));
+ }
+
+ @Test
public void testWhitelistBlacklist() throws Exception {
- WhitelistBlacklist whitelistBlacklist = new WhitelistBlacklist("dba",
"dba.tablea");
+ Map<String, String> configMap = Maps.newHashMap();
+ configMap.put("whitelist", "dba");
+ configMap.put("blacklist", "dba.tablea");
+ WhitelistBlacklist whitelistBlacklist = new
WhitelistBlacklist(ConfigFactory.parseMap(configMap));
+
Assert.assertTrue(whitelistBlacklist.acceptDb("dba"));
Assert.assertFalse(whitelistBlacklist.acceptDb("dbb"));
diff --git
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
index 6c1b105..6e8b9a2 100644
---
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
+++
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
@@ -83,7 +83,7 @@ public class AsyncHttpWriter<D, RQ, RP> extends
AbstractAsyncDataWriter<D> {
while (attempt < maxAttempts) {
try {
response = httpClient.sendRequest(rawRequest);
- } catch (IOException e) {
+ } catch (Exception e) {
// Retry
attempt++;
if (attempt == maxAttempts) {
diff --git
a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
index 011c7f7..4962529 100644
---
a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
+++
b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
@@ -234,7 +234,7 @@ public class AsyncHttpWriterTest {
// We won't consume the response anyway
return null;
}
- throw new IOException("Send failed");
+ throw new RuntimeException("Send failed");
}
@Override
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index 85ba232..e86c43e 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -64,6 +64,10 @@ public class KafkaBiLevelWorkUnitPacker extends
KafkaWorkUnitPacker {
@Override
public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int
numContainers) {
+ if (workUnitsByTopic == null || workUnitsByTopic.isEmpty()) {
+ return Lists.newArrayList();
+ }
+
double totalEstDataSize = setWorkUnitEstSizes(workUnitsByTopic);
double avgGroupSize = totalEstDataSize / numContainers /
getPreGroupingSizeFactor(this.state);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
index 8c71a6a..1025139 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
@@ -49,6 +49,10 @@ public class KafkaSingleLevelWorkUnitPacker extends
KafkaWorkUnitPacker {
@Override
public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int
numContainers) {
+ if (workUnitsByTopic == null || workUnitsByTopic.isEmpty()) {
+ return Lists.newArrayList();
+ }
+
setWorkUnitEstSizes(workUnitsByTopic);
List<WorkUnit> workUnits = Lists.newArrayList();
for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 1d34aec..786fe2a 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -329,15 +329,17 @@ public abstract class KafkaWorkUnitPacker {
pQueue.add(lightestMultiWorkUnit);
}
LinkedList<MultiWorkUnit> pQueue_filtered = new LinkedList();
- while(!pQueue.isEmpty())
- {
+ while(!pQueue.isEmpty()) {
MultiWorkUnit multiWorkUnit = pQueue.poll();
- if(multiWorkUnit.getWorkUnits().size() != 0)
- {
+ if(multiWorkUnit.getWorkUnits().size() != 0) {
pQueue_filtered.offer(multiWorkUnit);
}
}
+ if (pQueue_filtered.isEmpty()) {
+ return Lists.newArrayList();
+ }
+
logMultiWorkUnitInfo(pQueue_filtered);
double minLoad = getWorkUnitEstLoad(pQueue_filtered.peekFirst());
double maxLoad = getWorkUnitEstLoad(pQueue_filtered.peekLast());
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
index 89a81d0..4317c89 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -25,6 +25,8 @@ import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.Maps;
+
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -66,6 +68,20 @@ public class KafkaWorkUnitPackerTest {
Assert.assertTrue(anotherPacker instanceof KafkaSingleLevelWorkUnitPacker);
}
+ @Test
+ public void testPackEmptyWorkUnit() {
+ SourceState sourceState = new SourceState(state);
+ Map<String, List<WorkUnit>> emptyWorkUnit = Maps.newHashMap();
+ // Test single level packer
+ KafkaWorkUnitPacker mypacker = KafkaWorkUnitPacker.getInstance(source,
sourceState);
+ Assert.assertEquals(mypacker.pack(emptyWorkUnit, 1).size(), 0);
+ // Test bi level packer
+ sourceState.setProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE,
+
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaBiLevelWorkUnitPacker");
+ mypacker = KafkaWorkUnitPacker.getInstance(source, sourceState);
+ Assert.assertEquals(mypacker.pack(emptyWorkUnit, 1).size(), 0);
+ }
+
public class TestKafkaWorkUnitPacker extends KafkaWorkUnitPacker {
public TestKafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState
state) {
super(source, state);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
b/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
index 346110c..c5f7246 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
@@ -33,43 +33,62 @@ import com.typesafe.config.Config;
/**
- * A whitelist / blacklist implementation for filtering Hive tables. Parses
input whitelist and blacklist of the form
+ * A whitelist / blacklist implementation for filtering Hive tables. It can be
configured as
+ * case-insensitive({@code ignoreCase = true}) or case-sensitive({@code
ignoreCase = false}). By default, it's
+ * case-insensitive. <br>
+ *
+ * <p></p>
+ * Parses input whitelist and blacklist of the form
* [dbpattern.tablepattern1|tablepattern2|...],... and filters accordingly.
The db and table patterns accept "*"
* characters. Each of whitelist and blacklist is a list of patterns. For a
table to be accepted, it must fail the
* blacklist filter and pass the whitelist filter. Empty whitelist or
blacklist are noops.
*
+ * <p></p>
* <p>
* Example whitelist and blacklist patterns:
- * * db1.table1 -> only db1.table1 passes.
- * * db1 -> any table under db1 passes.
- * * db1.table* -> any table under db1 whose name satisfies the pattern
table* passes.
- * * db* -> all tables from all databases whose names satisfy the pattern
db* pass.
- * * db*.table* -> db and table must satisfy the patterns db* and table*
respectively
- * * db1.table1,db2.table2 -> combine expressions for different databases
with comma.
- * * db1.table1|table2 -> combine expressions for same database with "|".
+ * <li> db1.table1 -> only db1.table1 passes.
+ * <li> db1 -> any table under db1 passes.
+ * <li> db1.table* -> any table under db1 whose name satisfies the pattern
table* passes.
+ * <li> db* -> all tables from all databases whose names satisfy the pattern
db* pass.
+ * <li> db*.table* -> db and table must satisfy the patterns db* and table*
respectively
+ * <li> db1.table1,db2.table2 -> combine expressions for different databases
with comma.
+ * <li> db1.table1|table2 -> combine expressions for same database with "|".
* </p>
*/
public class WhitelistBlacklist implements Serializable {
public static final String WHITELIST = "whitelist";
public static final String BLACKLIST = "blacklist";
+ public static final String IGNORE_CASE = "whitelistBlacklist.ignoreCase";
private static final Pattern ALL_TABLES = Pattern.compile(".*");
private final SetMultimap<Pattern, Pattern> whitelistMultimap;
private final SetMultimap<Pattern, Pattern> blacklistMultimap;
+ private final boolean ignoreCase;
public WhitelistBlacklist(Config config) throws IOException {
- this(config.hasPath(WHITELIST) ? config.getString(WHITELIST).toLowerCase()
: "",
- config.hasPath(BLACKLIST) ? config.getString(BLACKLIST).toLowerCase()
: "");
+ this(config.hasPath(WHITELIST) ? config.getString(WHITELIST) : "",
+ config.hasPath(BLACKLIST) ? config.getString(BLACKLIST) : "",
+ !config.hasPath(IGNORE_CASE) || config.getBoolean(IGNORE_CASE));
}
public WhitelistBlacklist(String whitelist, String blacklist) throws
IOException {
+ this(whitelist, blacklist, true);
+ }
+
+ public WhitelistBlacklist(String whitelist, String blacklist, boolean
ignoreCase) throws IOException {
this.whitelistMultimap = HashMultimap.create();
this.blacklistMultimap = HashMultimap.create();
-
- populateMultimap(this.whitelistMultimap, whitelist.toLowerCase());
- populateMultimap(this.blacklistMultimap, blacklist.toLowerCase());
+ this.ignoreCase = ignoreCase;
+
+ if (ignoreCase) {
+ populateMultimap(this.whitelistMultimap, whitelist.toLowerCase());
+ populateMultimap(this.blacklistMultimap, blacklist.toLowerCase());
+ } else {
+ populateMultimap(this.whitelistMultimap, whitelist);
+ populateMultimap(this.blacklistMultimap, blacklist);
+ }
}
/**
@@ -83,15 +102,20 @@ public class WhitelistBlacklist implements Serializable {
* @return Whether the input table is accepted by this {@link
WhitelistBlacklist}.
*/
public boolean acceptTable(String db, String table) {
- return accept(db.toLowerCase(), table==null? Optional.<String> absent():
Optional.fromNullable(table.toLowerCase()));
+ return accept(db, table == null? Optional.<String> absent():
Optional.fromNullable(table));
}
private boolean accept(String db, Optional<String> table) {
- if (!this.blacklistMultimap.isEmpty() &&
multimapContains(this.blacklistMultimap, db, table, true)) {
+ String adjustedDb = ignoreCase ? db.toLowerCase() : db;
+ Optional<String> adjustedTable = ignoreCase && table.isPresent() ?
Optional.of(table.get().toLowerCase()) : table;
+
+ if (!this.blacklistMultimap.isEmpty() &&
+ multimapContains(this.blacklistMultimap, adjustedDb, adjustedTable,
true)) {
return false;
}
- return this.whitelistMultimap.isEmpty() ||
multimapContains(this.whitelistMultimap, db, table, false);
+ return this.whitelistMultimap.isEmpty() ||
+ multimapContains(this.whitelistMultimap, adjustedDb, adjustedTable,
false);
}
private static void populateMultimap(SetMultimap<Pattern, Pattern> multimap,
String list) throws IOException {