This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 21dd436  [GOBBLIN-1233][GOBBLIN-831] Add case-aware support in 
WhitelistBlacklist and other small fixes
21dd436 is described below

commit 21dd43619ba769143f9afa87ca89b37dacd67756
Author: zhchen <[email protected]>
AuthorDate: Fri Aug 14 16:24:41 2020 -0700

    [GOBBLIN-1233][GOBBLIN-831] Add case-aware support in WhitelistBlacklist 
and other small fixes
    
    Closes #3078 from zxcware/misc
---
 .../cluster/GobblinHelixJobLauncherTest.java       |  1 +
 .../partitioner/TimeBasedWriterPartitioner.java    |  3 ++
 .../runtime/retention/DatasetCleanerTask.java      | 11 +++++
 .../copy/hive/WhitelistBlacklistTest.java          | 24 +++++++++-
 .../org/apache/gobblin/writer/AsyncHttpWriter.java |  2 +-
 .../apache/gobblin/writer/AsyncHttpWriterTest.java |  2 +-
 .../packer/KafkaBiLevelWorkUnitPacker.java         |  4 ++
 .../packer/KafkaSingleLevelWorkUnitPacker.java     |  4 ++
 .../kafka/workunit/packer/KafkaWorkUnitPacker.java | 10 ++--
 .../workunit/packer/KafkaWorkUnitPackerTest.java   | 16 +++++++
 .../management/copy/hive/WhitelistBlacklist.java   | 56 +++++++++++++++-------
 11 files changed, 110 insertions(+), 23 deletions(-)

diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 1f98807..7625540 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -286,6 +286,7 @@ public class GobblinHelixJobLauncherTest {
     Assert.assertEquals(testListener.getCompletes().get() == 1, true);
   }
 
+  @Test(dependsOnMethods = {"testLaunchJob", "testLaunchMultipleJobs"})
   public void testJobCleanup() throws Exception {
     final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
 
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
index 5ec2c10..2c84a32 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/partitioner/TimeBasedWriterPartitioner.java
@@ -36,6 +36,8 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.DatePartitionType;
@@ -74,6 +76,7 @@ public abstract class TimeBasedWriterPartitioner<D> 
implements WriterPartitioner
   private final String writerPartitionSuffix;
   private final DatePartitionType granularity;
   private final DateTimeZone timeZone;
+  @Getter
   protected final TimeUnit timeUnit;
   private final Optional<DateTimeFormatter> timestampToPathFormatter;
   private final Schema schema;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
index b46e17c..4c5cbd2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/retention/DatasetCleanerTask.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.task.BaseAbstractTask;
@@ -30,6 +33,7 @@ import org.apache.gobblin.runtime.task.BaseAbstractTask;
 /**
  * A task that runs a DatasetCleaner job.
  */
+@Slf4j
 public class DatasetCleanerTask extends BaseAbstractTask {
 
   private static final String JOB_CONFIGURATION_PREFIX = "datasetCleaner";
@@ -47,8 +51,15 @@ public class DatasetCleanerTask extends BaseAbstractTask {
       DatasetCleaner datasetCleaner = new DatasetCleaner(FileSystem.get(new 
Configuration()),
           this.taskContext.getTaskState().getProperties());
       datasetCleaner.clean();
+      this.workingState = WorkUnitState.WorkingState.SUCCESSFUL;
     } catch (IOException e) {
+      this.workingState = WorkUnitState.WorkingState.FAILED;
       throw new RuntimeException(e);
     }
   }
+
+  @Override
+  public void commit() {
+    log.info("task {} commits with state {}", 
this.taskContext.getTaskState().getTaskId(), this.workingState);
+  }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
index 564cdf2..e5f056a 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklistTest.java
@@ -17,10 +17,15 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
+import java.util.Map;
+
 import org.testng.annotations.Test;
 
 import org.testng.Assert;
 
+import com.google.common.collect.Maps;
+import com.typesafe.config.ConfigFactory;
+
 
 public class WhitelistBlacklistTest {
 
@@ -159,8 +164,25 @@ public class WhitelistBlacklistTest {
   }
 
   @Test
+  public void testCaseAware() throws Exception {
+    WhitelistBlacklist whitelistBlacklist = new 
WhitelistBlacklist("dB*.Table*", "*.Tablea", false);
+    Assert.assertTrue(whitelistBlacklist.acceptDb("dBa"));
+    Assert.assertTrue(whitelistBlacklist.acceptDb("dBb"));
+    Assert.assertFalse(whitelistBlacklist.acceptDb("dbb"));
+
+    Assert.assertFalse(whitelistBlacklist.acceptTable("dBa", "Tablea"));
+    Assert.assertTrue(whitelistBlacklist.acceptTable("dBa", "Tableb"));
+    Assert.assertFalse(whitelistBlacklist.acceptTable("dbb", "Tableb"));
+    Assert.assertFalse(whitelistBlacklist.acceptTable("dBb", "tableb"));
+  }
+
+  @Test
   public void testWhitelistBlacklist() throws Exception {
-    WhitelistBlacklist whitelistBlacklist = new WhitelistBlacklist("dba", 
"dba.tablea");
+    Map<String, String> configMap = Maps.newHashMap();
+    configMap.put("whitelist", "dba");
+    configMap.put("blacklist", "dba.tablea");
+    WhitelistBlacklist whitelistBlacklist = new 
WhitelistBlacklist(ConfigFactory.parseMap(configMap));
+
     Assert.assertTrue(whitelistBlacklist.acceptDb("dba"));
     Assert.assertFalse(whitelistBlacklist.acceptDb("dbb"));
 
diff --git 
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
 
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
index 6c1b105..6e8b9a2 100644
--- 
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
+++ 
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/writer/AsyncHttpWriter.java
@@ -83,7 +83,7 @@ public class AsyncHttpWriter<D, RQ, RP> extends 
AbstractAsyncDataWriter<D> {
     while (attempt < maxAttempts) {
       try {
           response = httpClient.sendRequest(rawRequest);
-      } catch (IOException e) {
+      } catch (Exception e) {
         // Retry
         attempt++;
         if (attempt == maxAttempts) {
diff --git 
a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
 
b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
index 011c7f7..4962529 100644
--- 
a/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
+++ 
b/gobblin-modules/gobblin-http/src/test/java/org/apache/gobblin/writer/AsyncHttpWriterTest.java
@@ -234,7 +234,7 @@ public class AsyncHttpWriterTest {
         // We won't consume the response anyway
         return null;
       }
-      throw new IOException("Send failed");
+      throw new RuntimeException("Send failed");
     }
 
     @Override
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index 85ba232..e86c43e 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -64,6 +64,10 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
 
   @Override
   public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int 
numContainers) {
+    if (workUnitsByTopic == null || workUnitsByTopic.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
     double totalEstDataSize = setWorkUnitEstSizes(workUnitsByTopic);
     double avgGroupSize = totalEstDataSize / numContainers / 
getPreGroupingSizeFactor(this.state);
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
index 8c71a6a..1025139 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
@@ -49,6 +49,10 @@ public class KafkaSingleLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
 
   @Override
   public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int 
numContainers) {
+    if (workUnitsByTopic == null || workUnitsByTopic.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
     setWorkUnitEstSizes(workUnitsByTopic);
     List<WorkUnit> workUnits = Lists.newArrayList();
     for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index 1d34aec..786fe2a 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -329,15 +329,17 @@ public abstract class KafkaWorkUnitPacker {
       pQueue.add(lightestMultiWorkUnit);
     }
     LinkedList<MultiWorkUnit> pQueue_filtered = new LinkedList();
-    while(!pQueue.isEmpty())
-    {
+    while(!pQueue.isEmpty()) {
       MultiWorkUnit multiWorkUnit = pQueue.poll();
-      if(multiWorkUnit.getWorkUnits().size() != 0)
-      {
+      if(multiWorkUnit.getWorkUnits().size() != 0) {
         pQueue_filtered.offer(multiWorkUnit);
       }
     }
 
+    if (pQueue_filtered.isEmpty()) {
+      return Lists.newArrayList();
+    }
+
     logMultiWorkUnitInfo(pQueue_filtered);
     double minLoad = getWorkUnitEstLoad(pQueue_filtered.peekFirst());
     double maxLoad = getWorkUnitEstLoad(pQueue_filtered.peekLast());
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
index 89a81d0..4317c89 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -25,6 +25,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Maps;
+
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -66,6 +68,20 @@ public class KafkaWorkUnitPackerTest {
     Assert.assertTrue(anotherPacker instanceof KafkaSingleLevelWorkUnitPacker);
   }
 
+  @Test
+  public void testPackEmptyWorkUnit() {
+    SourceState sourceState = new SourceState(state);
+    Map<String, List<WorkUnit>> emptyWorkUnit = Maps.newHashMap();
+    // Test single level packer
+    KafkaWorkUnitPacker mypacker = KafkaWorkUnitPacker.getInstance(source, 
sourceState);
+    Assert.assertEquals(mypacker.pack(emptyWorkUnit, 1).size(), 0);
+    // Test bi level packer
+    sourceState.setProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE,
+        
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaBiLevelWorkUnitPacker");
+    mypacker = KafkaWorkUnitPacker.getInstance(source, sourceState);
+    Assert.assertEquals(mypacker.pack(emptyWorkUnit, 1).size(), 0);
+  }
+
   public class TestKafkaWorkUnitPacker extends KafkaWorkUnitPacker {
     public TestKafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState 
state) {
       super(source, state);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
index 346110c..c5f7246 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/data/management/copy/hive/WhitelistBlacklist.java
@@ -33,43 +33,62 @@ import com.typesafe.config.Config;
 
 
 /**
- * A whitelist / blacklist implementation for filtering Hive tables. Parses 
input whitelist and blacklist of the form
+ * A whitelist / blacklist implementation for filtering Hive tables. It can be 
configured as
+ * case-insensitive({@code ignoreCase = true}) or case-sensitive({@code 
ignoreCase = false}). By default, it's
+ * case-insensitive. <br>
+ *
+ * <p></p>
+ * Parses input whitelist and blacklist of the form
  * [dbpattern.tablepattern1|tablepattern2|...],... and filters accordingly. 
The db and table patterns accept "*"
  * characters. Each of whitelist and blacklist is a list of patterns. For a 
table to be accepted, it must fail the
  * blacklist filter and pass the whitelist filter. Empty whitelist or 
blacklist are noops.
  *
+ * <p></p>
  * <p>
  *   Example whitelist and blacklist patterns:
- *   * db1.table1 -> only db1.table1 passes.
- *   * db1 -> any table under db1 passes.
- *   * db1.table* -> any table under db1 whose name satisfies the pattern 
table* passes.
- *   * db* -> all tables from all databases whose names satisfy the pattern 
db* pass.
- *   * db*.table* -> db and table must satisfy the patterns db* and table* 
respectively
- *   * db1.table1,db2.table2 -> combine expressions for different databases 
with comma.
- *   * db1.table1|table2 -> combine expressions for same database with "|".
+ *   <li> db1.table1 -> only db1.table1 passes.
+ *   <li> db1 -> any table under db1 passes.
+ *   <li> db1.table* -> any table under db1 whose name satisfies the pattern 
table* passes.
+ *   <li> db* -> all tables from all databases whose names satisfy the pattern 
db* pass.
+ *   <li> db*.table* -> db and table must satisfy the patterns db* and table* 
respectively
+ *   <li> db1.table1,db2.table2 -> combine expressions for different databases 
with comma.
+ *   <li> db1.table1|table2 -> combine expressions for same database with "|".
  * </p>
  */
 public class WhitelistBlacklist implements Serializable {
 
   public static final String WHITELIST = "whitelist";
   public static final String BLACKLIST = "blacklist";
+  public static final String IGNORE_CASE = "whitelistBlacklist.ignoreCase";
 
   private static final Pattern ALL_TABLES = Pattern.compile(".*");
 
   private final SetMultimap<Pattern, Pattern> whitelistMultimap;
   private final SetMultimap<Pattern, Pattern> blacklistMultimap;
+  private final boolean ignoreCase;
 
   public WhitelistBlacklist(Config config) throws IOException {
-    this(config.hasPath(WHITELIST) ? config.getString(WHITELIST).toLowerCase() 
: "",
-        config.hasPath(BLACKLIST) ? config.getString(BLACKLIST).toLowerCase() 
: "");
+    this(config.hasPath(WHITELIST) ? config.getString(WHITELIST) : "",
+        config.hasPath(BLACKLIST) ? config.getString(BLACKLIST) : "",
+        !config.hasPath(IGNORE_CASE) || config.getBoolean(IGNORE_CASE));
   }
 
   public WhitelistBlacklist(String whitelist, String blacklist) throws 
IOException {
+    this(whitelist, blacklist, true);
+  }
+
+  public WhitelistBlacklist(String whitelist, String blacklist, boolean 
ignoreCase) throws IOException {
     this.whitelistMultimap = HashMultimap.create();
     this.blacklistMultimap = HashMultimap.create();
-
-    populateMultimap(this.whitelistMultimap, whitelist.toLowerCase());
-    populateMultimap(this.blacklistMultimap, blacklist.toLowerCase());
+    this.ignoreCase = ignoreCase;
+
+    if (ignoreCase) {
+      populateMultimap(this.whitelistMultimap, whitelist.toLowerCase());
+      populateMultimap(this.blacklistMultimap, blacklist.toLowerCase());
+    } else {
+      populateMultimap(this.whitelistMultimap, whitelist);
+      populateMultimap(this.blacklistMultimap, blacklist);
+    }
   }
 
   /**
@@ -83,15 +102,20 @@ public class WhitelistBlacklist implements Serializable {
    * @return Whether the input table is accepted by this {@link 
WhitelistBlacklist}.
    */
   public boolean acceptTable(String db, String table) {
-    return accept(db.toLowerCase(), table==null? Optional.<String> absent(): 
Optional.fromNullable(table.toLowerCase()));
+    return accept(db, table == null? Optional.<String> absent(): 
Optional.fromNullable(table));
   }
 
   private boolean accept(String db, Optional<String> table) {
-    if (!this.blacklistMultimap.isEmpty() && 
multimapContains(this.blacklistMultimap, db, table, true)) {
+    String adjustedDb = ignoreCase ? db.toLowerCase() : db;
+    Optional<String> adjustedTable = ignoreCase && table.isPresent() ? 
Optional.of(table.get().toLowerCase()) : table;
+
+    if (!this.blacklistMultimap.isEmpty() &&
+        multimapContains(this.blacklistMultimap, adjustedDb, adjustedTable, 
true)) {
       return false;
     }
 
-    return this.whitelistMultimap.isEmpty() || 
multimapContains(this.whitelistMultimap, db, table, false);
+    return this.whitelistMultimap.isEmpty() ||
+        multimapContains(this.whitelistMultimap, adjustedDb, adjustedTable, 
false);
   }
 
   private static void populateMultimap(SetMultimap<Pattern, Pattern> multimap, 
String list) throws IOException {

Reply via email to