This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d30aae  [GOBBLIN-1284] Fix flaky tests causing local build failures
0d30aae is described below

commit 0d30aaecee1e2eed5dfcfdbcbc8efba96d63ed7c
Author: suvasude <[email protected]>
AuthorDate: Mon Oct 12 20:07:27 2020 -0700

    [GOBBLIN-1284] Fix flaky tests causing local build failures
    
    Closes #3123 from
    sv2000/rateControlledFileSystemTest
---
 build.gradle                                       |  1 +
 .../management/conversion/hive/HiveSourceTest.java | 49 ++++++++++------------
 .../hive/LocalHiveMetastoreTestUtils.java          |  5 +++
 .../hive/converter/HiveAvroToOrcConverterTest.java | 24 ++++++-----
 .../src/test/resources/hive-site.xml               |  4 --
 .../gobblin/kafka/writer/Kafka09DataWriter.java    | 46 +++++++++++---------
 .../extract/kafka/KafkaSimpleStreamingTest.java    |  6 +--
 .../gobblin/runtime/HighLevelConsumerTest.java     | 26 ++++++++----
 .../kafka/writer/KafkaWriterConfigurationKeys.java |  2 +
 .../apache/gobblin/service/FlowConfigClient.java   |  2 +-
 .../apache/gobblin/service/FlowConfigV2Client.java |  2 +-
 .../runtime/kafka/MockedHighLevelConsumer.java     | 10 ++---
 .../util/RatedControlledFileSystemTest.java        | 10 +++--
 13 files changed, 105 insertions(+), 82 deletions(-)

diff --git a/build.gradle b/build.gradle
index 701aa0b..3b4b101 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,6 +215,7 @@ rat {
     '**/*.dat',
     '**/*.pem',
     '**/*.crc',
+    '**/*.gpg',
     '**/*.jst',
     '**/*.orc',
     '**/*.rc',
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
index eee2005..4a1da03 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.data.management.conversion.hive;
 
+import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.io.Files;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
@@ -45,28 +47,31 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 
 @Test(groups = { "gobblin.data.management.conversion" })
 public class HiveSourceTest {
+  private static final String TEST_TABLE_1 = "testtable1";
+  private static final String TEST_TABLE_2 = "testtable2";
+  private static final String TEST_TABLE_3 = "testtable3";
 
   private LocalHiveMetastoreTestUtils hiveMetastoreTestUtils;
   private HiveSource hiveSource;
-
+  private File tmpDir;
   @BeforeClass
-  public void setup() throws Exception {
+  public void setup() {
     this.hiveMetastoreTestUtils = LocalHiveMetastoreTestUtils.getInstance();
     this.hiveSource = new HiveSource();
+    this.tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
   }
 
   @Test
   public void testGetWorkUnitsForTable() throws Exception {
-
     String dbName = "testdb2";
-    String tableName = "testtable2";
-    String tableSdLoc = "/tmp/testtable2";
+    String tableSdLoc = new File(this.tmpDir, TEST_TABLE_2).getAbsolutePath();
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
     SourceState testState = getTestState(dbName);
 
-    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName, 
tableSdLoc, Optional.<String> absent());
+    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_2, 
tableSdLoc, Optional.<String> absent());
 
     List<WorkUnit> workUnits = hiveSource.getWorkunits(testState);
 
@@ -77,22 +82,20 @@ public class HiveSourceTest {
     HiveWorkUnit hwu = new HiveWorkUnit(wu);
 
     Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getDb(), dbName);
-    Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(), 
tableName);
+    Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(), 
TEST_TABLE_2);
     Assert.assertEquals(hwu.getTableSchemaUrl(), new Path("/tmp/dummy"));
   }
 
   @Test
   public void testGetWorkUnitsForPartitions() throws Exception {
-
     String dbName = "testdb3";
-    String tableName = "testtable3";
-    String tableSdLoc = "/tmp/testtable3";
+    String tableSdLoc = new File(this.tmpDir, TEST_TABLE_3).getAbsolutePath();
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
     SourceState testState = getTestState(dbName);
 
-    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, tableSdLoc, Optional.of("field"));
+    Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
TEST_TABLE_3, tableSdLoc, Optional.of("field"));
 
     this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), 
(int) System.currentTimeMillis());
 
@@ -103,7 +106,7 @@ public class HiveSourceTest {
     WorkUnit wu = workUnits.get(0);
     WorkUnit wu2 = workUnits.get(1);
 
-    HiveWorkUnit hwu = null;
+    HiveWorkUnit hwu;
     if (!wu.contains(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)) {
       hwu = new HiveWorkUnit(wu);
     } else {
@@ -111,29 +114,26 @@ public class HiveSourceTest {
     }
 
     Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getDb(), dbName);
-    Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(), 
tableName);
+    Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(), 
TEST_TABLE_3);
     Assert.assertEquals(hwu.getPartitionName().get(), "field=f1");
   }
 
   @Test
   public void testGetWorkunitsAfterWatermark() throws Exception {
-
     String dbName = "testdb4";
-    String tableName1 = "testtable1";
-    String tableSdLoc1 = "/tmp/testtable1";
-    String tableName2 = "testtable2";
-    String tableSdLoc2 = "/tmp/testtable2";
+    String tableSdLoc1 = new File(this.tmpDir, TEST_TABLE_1).getAbsolutePath();
+    String tableSdLoc2 = new File(this.tmpDir, TEST_TABLE_2).getAbsolutePath();
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
-    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName1, 
tableSdLoc1, Optional.<String> absent());
-    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName2, 
tableSdLoc2, Optional.<String> absent(), true);
+    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_1, 
tableSdLoc1, Optional.<String> absent());
+    this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_2, 
tableSdLoc2, Optional.<String> absent(), true);
 
     List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList();
 
-    Table table1 = 
this.hiveMetastoreTestUtils.getLocalMetastoreClient().getTable(dbName, 
tableName1);
+    Table table1 = 
this.hiveMetastoreTestUtils.getLocalMetastoreClient().getTable(dbName, 
TEST_TABLE_1);
 
-    previousWorkUnitStates.add(ConversionHiveTestUtils.createWus(dbName, 
tableName1,
+    previousWorkUnitStates.add(ConversionHiveTestUtils.createWus(dbName, 
TEST_TABLE_1,
         TimeUnit.MILLISECONDS.convert(table1.getCreateTime(), 
TimeUnit.SECONDS)));
 
     SourceState testState = new SourceState(getTestState(dbName), 
previousWorkUnitStates);
@@ -147,12 +147,11 @@ public class HiveSourceTest {
     HiveWorkUnit hwu = new HiveWorkUnit(wu);
 
     Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getDb(), dbName);
-    Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(), 
tableName2);
+    Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(), 
TEST_TABLE_2);
   }
 
   @Test
   public void testShouldCreateWorkunitsOlderThanLookback() throws Exception {
-
     long currentTime = System.currentTimeMillis();
     long partitionCreateTime = new 
DateTime(currentTime).minusDays(35).getMillis();
 
@@ -170,7 +169,6 @@ public class HiveSourceTest {
 
   @Test
   public void testShouldCreateWorkunitsNewerThanLookback() throws Exception {
-
     long currentTime = System.currentTimeMillis();
     // Default lookback time is 3 days
     long partitionCreateTime = new 
DateTime(currentTime).minusDays(2).getMillis();
@@ -189,7 +187,6 @@ public class HiveSourceTest {
 
   @Test
   public void testIsOlderThanLookbackForDistcpGenerationTime() throws 
Exception {
-
     long currentTime = System.currentTimeMillis();
     // Default lookback time is 3 days
     long partitionCreateTime = new 
DateTime(currentTime).minusDays(2).getMillis();
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
index 5d684b4..42fc935 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
@@ -42,6 +42,7 @@ import org.apache.thrift.TException;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
 
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
@@ -62,6 +63,10 @@ public class LocalHiveMetastoreTestUtils {
     } catch (IOException e) {
       e.printStackTrace();
     }
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    Properties p = System.getProperties();
+    p.setProperty("derby.system.home", tmpDir.getAbsolutePath());
     this.localMetastoreClient =
         HiveMetastoreClientPool.get(new Properties(), 
Optional.<String>absent()).getClient().get();
   }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
index 893e13c..8c7d96c 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.conversion.hive.converter;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
@@ -32,6 +33,7 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -48,11 +50,15 @@ import 
org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
 
 @Test(groups = { "gobblin.data.management.conversion" })
 public class HiveAvroToOrcConverterTest {
+  private static final String TEST_TABLE = "testtable";
 
   private static String resourceDir = "hiveConverterTest";
   private LocalHiveMetastoreTestUtils hiveMetastoreTestUtils;
+  private File tmpDir;
 
   public HiveAvroToOrcConverterTest() {
+    this.tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
     this.hiveMetastoreTestUtils = LocalHiveMetastoreTestUtils.getInstance();
   }
 
@@ -63,21 +69,20 @@ public class HiveAvroToOrcConverterTest {
   @Test
   public void testFlattenSchemaDDLandDML() throws Exception {
     String dbName = "testdb";
-    String tableName = "testtable";
-    String tableSdLoc = "/tmp/testtable";
+    String tableSdLoc = new File(this.tmpDir, TEST_TABLE).getAbsolutePath();
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
-    Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, tableSdLoc, Optional.<String> absent());
+    Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
TEST_TABLE, tableSdLoc, Optional.<String> absent());
     Schema schema = 
ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir, 
"recordWithinRecordWithinRecord_nested.json");
-    WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName, 
0);
+    WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, TEST_TABLE, 
0);
 
     try (HiveAvroToFlattenedOrcConverter converter = new 
HiveAvroToFlattenedOrcConverter();) {
 
       Config config = ConfigFactory.parseMap(
           ImmutableMap.<String, String>builder().put("destinationFormats", 
"flattenedOrc")
               .put("flattenedOrc.destination.dbName", dbName)
-              .put("flattenedOrc.destination.tableName", tableName + "_orc")
+              .put("flattenedOrc.destination.tableName", TEST_TABLE + "_orc")
               .put("flattenedOrc.destination.dataPath", "file:" + tableSdLoc + 
"_orc").build());
 
       ConvertibleHiveDataset cd = 
ConvertibleHiveDatasetTest.createTestConvertibleDataset(config);
@@ -115,14 +120,13 @@ public class HiveAvroToOrcConverterTest {
   @Test
   public void testNestedSchemaDDLandDML() throws Exception {
     String dbName = "testdb";
-    String tableName = "testtable";
-    String tableSdLoc = "/tmp/testtable";
+    String tableSdLoc = new File(this.tmpDir, TEST_TABLE).getAbsolutePath();
 
     this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, 
false, true, true);
 
-    Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
tableName, tableSdLoc, Optional.<String> absent());
+    Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, 
TEST_TABLE, tableSdLoc, Optional.<String> absent());
     Schema schema = 
ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir, 
"recordWithinRecordWithinRecord_nested.json");
-    WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName, 
0);
+    WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, TEST_TABLE, 
0);
     wus.getJobState().setProp("orc.table.flatten.schema", "false");
 
     try (HiveAvroToNestedOrcConverter converter = new 
HiveAvroToNestedOrcConverter();) {
@@ -131,7 +135,7 @@ public class HiveAvroToOrcConverterTest {
           .put("destinationFormats", "nestedOrc")
           .put("nestedOrc.destination.tableName","testtable_orc_nested")
           .put("nestedOrc.destination.dbName",dbName)
-          
.put("nestedOrc.destination.dataPath","file:/tmp/testtable_orc_nested")
+          .put("nestedOrc.destination.dataPath","file:" + tableSdLoc + 
"_orc_nested")
           .build());
 
       ConvertibleHiveDataset cd = 
ConvertibleHiveDatasetTest.createTestConvertibleDataset(config);
diff --git a/gobblin-data-management/src/test/resources/hive-site.xml 
b/gobblin-data-management/src/test/resources/hive-site.xml
index ae1b4cd..0cd205d 100644
--- a/gobblin-data-management/src/test/resources/hive-site.xml
+++ b/gobblin-data-management/src/test/resources/hive-site.xml
@@ -4,10 +4,6 @@
     <value>org.apache.derby.jdbc.EmbeddedDriver</value>
   </property>
   <property>
-    <name>javax.jdo.option.ConnectionURL</name>
-    
<value>jdbc:derby:;databaseName=/tmp/scratch/hive/metastore_db;create=true</value>
-  </property>
-  <property>
     <name>hive.metastore.warehouse.dir</name>
     <value>/tmp/scratch/hive/warehouse</value>
   </property>
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 3bef426..e8c8c73 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -153,34 +153,42 @@ public class Kafka09DataWriter<K, V> implements 
KafkaDataWriter<K, V> {
          this.producer.flush();
   }
 
-  private void provisionTopic(String topicName,Config config) {
+  private void provisionTopic(String topicName, Config config) {
     String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
-    if(!config.hasPath(zooKeeperPropKey)) {
-     log.debug("Topic "+topicName+" is configured without the partition and 
replication");
-     return;
+    if (!config.hasPath(zooKeeperPropKey)) {
+      log.debug("Topic " + topicName + " is configured without the partition 
and replication");
+      return;
     }
     String zookeeperConnect = config.getString(zooKeeperPropKey);
-    int sessionTimeoutMs = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT, 
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
-    int connectionTimeoutMs = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT, 
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+    int sessionTimeoutMs = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT,
+        KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+    int connectionTimeoutMs = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT,
+        KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
     // Note: You must initialize the ZkClient with ZKStringSerializer.  If you 
don't, then
     // createTopic() will only seem to work (it will return without error).  
The topic will exist in
     // only ZooKeeper and will be returned when listing topics, but Kafka 
itself does not create the
     // topic.
-    ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, 
connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+    ZkClient zkClient =
+        new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, 
ZKStringSerializer$.MODULE$);
     // Security for Kafka was added in Kafka 0.9.0.0
     ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(zookeeperConnect), false);
-    int partitions = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.PARTITION_COUNT, 
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
-    int replication = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.REPLICATION_COUNT, 
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    int partitions = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.PARTITION_COUNT,
+        KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+    int replication = ConfigUtils.getInt(config, 
KafkaWriterConfigurationKeys.REPLICATION_COUNT,
+        KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
     Properties topicConfig = new Properties();
-    if(AdminUtils.topicExists(zkUtils, topicName)) {
-          log.debug("Topic"+topicName+" already Exists with replication: 
"+replication+" and partitions :"+partitions);
-       return;
-    }
-    try {
-       AdminUtils.createTopic(zkUtils, topicName, partitions, replication, 
topicConfig);
-    } catch (RuntimeException e) {
-       throw new RuntimeException(e);
-    }
-       log.info("Created Topic "+topicName+" with replication: "+replication+" 
and partitions :"+partitions);
+    if (AdminUtils.topicExists(zkUtils, topicName)) {
+      log.debug("Topic {} already exists with replication: {} and partitions: 
{}", topicName, replication, partitions);
+      boolean deleteTopicIfExists = ConfigUtils.getBoolean(config, 
KafkaWriterConfigurationKeys.DELETE_TOPIC_IF_EXISTS,
+          KafkaWriterConfigurationKeys.DEFAULT_DELETE_TOPIC_IF_EXISTS);
+      if (!deleteTopicIfExists) {
+        return;
+      } else {
+        log.debug("Deleting topic {}", topicName);
+        AdminUtils.deleteTopic(zkUtils, topicName);
+      }
     }
+    AdminUtils.createTopic(zkUtils, topicName, partitions, replication, 
topicConfig);
+    log.info("Created topic {} with replication: {} and partitions : {}", 
topicName, replication, partitions);
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
index db66a85..b3fceac 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
@@ -140,7 +140,7 @@ public class KafkaSimpleStreamingTest {
    * @throws InterruptedException
    * @throws DataRecordException
    */
-  @Test(timeOut = 10000)
+  @Test(timeOut = 30000)
   public void testExtractor()
       throws IOException, InterruptedException, DataRecordException {
     final String topic = "testSimpleStreamingExtractor";
@@ -213,7 +213,7 @@ public class KafkaSimpleStreamingTest {
    * original thread calls close on the extractor and verifies the waiting 
thread gets an expected exception and exits
    * as expected.
    */
-  @Test(timeOut = 10000)
+  @Test(timeOut = 30000)
   public void testThreadedExtractor() {
     final String topic = "testThreadedExtractor";
     final KafkaSimpleStreamingExtractor<String, byte[]> kSSE = 
getStreamingExtractor(topic);
@@ -247,7 +247,7 @@ public class KafkaSimpleStreamingTest {
   /**
    * Test that the extractor barfs on not calling start
    */
-  @Test(timeOut = 10000)
+  @Test(timeOut = 30000)
   public void testExtractorStart() {
 
     final String topic = "testExtractorStart";
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index a54a75c..8a28bf2 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import com.google.api.client.util.Lists;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.io.Closer;
 import com.google.common.io.Files;
@@ -50,11 +51,9 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
 import org.apache.gobblin.writer.WriteCallback;
 
-
 @Test
 @Slf4j
 public class HighLevelConsumerTest extends KafkaTestBase {
-
   private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
   private static final String KAFKA_AUTO_OFFSET_RESET_KEY = 
"auto.offset.reset";
   private static final String SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT = 
AbstractBaseKafkaConsumerClient.CONFIG_NAMESPACE + "." + 
AbstractBaseKafkaConsumerClient.CONSUMER_CONFIG + ".";
@@ -72,20 +71,25 @@ public class HighLevelConsumerTest extends KafkaTestBase {
   }
 
   @BeforeSuite
-  public void beforeSuite() throws Exception {
+  public void beforeSuite()
+      throws Exception {
     startServers();
     _closer = Closer.create();
     Properties producerProps = new Properties();
     producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
-    
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
 + BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
-    
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
 + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+    producerProps
+        .setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX 
+ BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
+    
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+            + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
     producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER, 
this.getZkConnectString());
     producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT, 
String.valueOf(NUM_PARTITIONS));
+    
producerProps.setProperty(KafkaWriterConfigurationKeys.DELETE_TOPIC_IF_EXISTS, 
String.valueOf(true));
     AsyncDataWriter<byte[]> dataWriter = _closer.register(new 
Kafka09DataWriter<byte[], byte[]>(producerProps));
 
     List<byte[]> records = createByteArrayMessages();
     WriteCallback mock = Mockito.mock(WriteCallback.class);
-    for(byte[] record : records) {
+    for (byte[] record : records) {
       dataWriter.write(record, mock);
     }
     dataWriter.flush();
@@ -114,12 +118,14 @@ public class HighLevelConsumerTest extends KafkaTestBase {
     consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
     
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
     consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+    //Generate a brand new consumer group id to ensure there are no previously 
committed offsets for this group id
+    String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", 
System.currentTimeMillis());
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
     consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, 
"true");
-
     MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, 
ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS);
     consumer.startAsync().awaitRunning();
 
-    consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
+    consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
     consumer.shutDown();
   }
 
@@ -129,7 +135,9 @@ public class HighLevelConsumerTest extends KafkaTestBase {
     consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
     
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
     consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
-
+    //Generate a brand new consumer group id to ensure there are no previously 
committed offsets for this group id
+    String consumerGroupId = Joiner.on("-").join(TOPIC, "manual", 
System.currentTimeMillis());
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
     // Setting this to a second to make sure we are committing offsets 
frequently
     consumerProps.put(HighLevelConsumer.OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY, 
1);
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index bb8681e..52af940 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -70,6 +70,8 @@ public class KafkaWriterConfigurationKeys {
   static final int REPLICATION_COUNT_DEFAULT = 1;
   public static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG + 
"partitionCount";
   static final int PARTITION_COUNT_DEFAULT = 1;
+  public static final String DELETE_TOPIC_IF_EXISTS = KAFKA_TOPIC_CONFIG + 
"deleteTopicIfExists";
+  static final Boolean DEFAULT_DELETE_TOPIC_IF_EXISTS = false;
   public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER + 
".sto";
   static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
   public static final String ZOOKEEPER_CONNECTION_TIMEOUT = CLUSTER_ZOOKEEPER 
+ ".cto";
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
index 58ae8d1..ae74f26 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
@@ -60,7 +60,7 @@ public class FlowConfigClient implements Closeable {
    * @param serverUri address and port of the REST server
    */
   public FlowConfigClient(String serverUri) {
-    this(serverUri, Collections.<String, String>emptyMap());
+    this(serverUri, Collections.emptyMap());
   }
 
   public FlowConfigClient(String serverUri, Map<String, String> properties) {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 98b9e90..27dd79a 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -77,7 +77,7 @@ public class FlowConfigV2Client implements Closeable {
     LOG.debug("FlowConfigClient with serverUri " + serverUri);
 
     _httpClientFactory = Optional.of(new HttpClientFactory());
-    Client r2Client = new 
TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String, 
String>emptyMap()));
+    Client r2Client = new 
TransportClientAdapter(_httpClientFactory.get().getClient(properties));
     _restClient = Optional.of(new RestClient(r2Client, serverUri));
     _flowconfigsV2RequestBuilders = createRequestBuilders();
   }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
index 520b354..36c0b6b 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
@@ -17,13 +17,13 @@
 
 package org.apache.gobblin.runtime.kafka;
 
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
 import javax.annotation.Nullable;
@@ -38,13 +38,13 @@ import org.apache.gobblin.testing.AssertWithBackoff;
 public class MockedHighLevelConsumer extends HighLevelConsumer<byte[], byte[]> 
{
 
   @Getter
-  private final List<byte[]> messages;
+  private final BlockingQueue<byte[]> messages;
   @Getter
   private final Map<KafkaPartition, Long> committedOffsets;
 
   public MockedHighLevelConsumer(String topic, Config config, int numThreads) {
     super(topic, config, numThreads);
-    this.messages = Lists.newArrayList();
+    this.messages = new LinkedBlockingQueue<>();
     this.committedOffsets = new ConcurrentHashMap<>();
   }
 
@@ -59,7 +59,7 @@ public class MockedHighLevelConsumer extends 
HighLevelConsumer<byte[], byte[]> {
 
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) 
{
-    this.messages.add(message.getValue());
+    this.messages.offer(message.getValue());
   }
 
   @Override
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
index 4818b0a..0d2d849 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
@@ -32,6 +32,8 @@ import org.testng.annotations.Test;
 import com.codahale.metrics.Meter;
 import com.google.common.math.DoubleMath;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.util.limiter.Limiter;
 import org.apache.gobblin.util.limiter.RateBasedLimiter;
 
@@ -39,6 +41,7 @@ import org.apache.gobblin.util.limiter.RateBasedLimiter;
 /**
  * Unit tests for {@link RatedControlledFileSystem}.
  */
+@Slf4j
 @Test(groups = { "gobblin.util" })
 public class RatedControlledFileSystemTest {
 
@@ -69,16 +72,15 @@ public class RatedControlledFileSystemTest {
   }
 
   @Test
-  public void testFsOperation() throws IOException, InterruptedException {
+  public void testFsOperation() throws IOException {
     Meter meter = new Meter();
     Path fakePath = new Path("fakePath");
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < 500; i++) {
       Assert.assertFalse(this.rateControlledFs.exists(fakePath));
       meter.mark();
-      Thread.sleep((RANDOM.nextInt() & Integer.MAX_VALUE) % 10);
     }
     // Assert a fuzzy equal with 5% of tolerance
-    Assert.assertTrue(DoubleMath.fuzzyEquals(meter.getMeanRate(), 20d, 20d * 
0.05));
+    Assert.assertTrue(DoubleMath.fuzzyEquals(meter.getMeanRate(), 20d, 20d * 
0.10));
   }
 
   @AfterClass

Reply via email to