This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0d30aae [GOBBLIN-1284] Fix flaky tests causing local build failures
0d30aae is described below
commit 0d30aaecee1e2eed5dfcfdbcbc8efba96d63ed7c
Author: suvasude <[email protected]>
AuthorDate: Mon Oct 12 20:07:27 2020 -0700
[GOBBLIN-1284] Fix flaky tests causing local build failures
Closes #3123 from
sv2000/rateControlledFileSystemTest
---
build.gradle | 1 +
.../management/conversion/hive/HiveSourceTest.java | 49 ++++++++++------------
.../hive/LocalHiveMetastoreTestUtils.java | 5 +++
.../hive/converter/HiveAvroToOrcConverterTest.java | 24 ++++++-----
.../src/test/resources/hive-site.xml | 4 --
.../gobblin/kafka/writer/Kafka09DataWriter.java | 46 +++++++++++---------
.../extract/kafka/KafkaSimpleStreamingTest.java | 6 +--
.../gobblin/runtime/HighLevelConsumerTest.java | 26 ++++++++----
.../kafka/writer/KafkaWriterConfigurationKeys.java | 2 +
.../apache/gobblin/service/FlowConfigClient.java | 2 +-
.../apache/gobblin/service/FlowConfigV2Client.java | 2 +-
.../runtime/kafka/MockedHighLevelConsumer.java | 10 ++---
.../util/RatedControlledFileSystemTest.java | 10 +++--
13 files changed, 105 insertions(+), 82 deletions(-)
diff --git a/build.gradle b/build.gradle
index 701aa0b..3b4b101 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,6 +215,7 @@ rat {
'**/*.dat',
'**/*.pem',
'**/*.crc',
+ '**/*.gpg',
'**/*.jst',
'**/*.orc',
'**/*.rc',
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
index eee2005..4a1da03 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.data.management.conversion.hive;
+import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.io.Files;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
@@ -45,28 +47,31 @@ import org.apache.gobblin.source.workunit.WorkUnit;
@Test(groups = { "gobblin.data.management.conversion" })
public class HiveSourceTest {
+ private static final String TEST_TABLE_1 = "testtable1";
+ private static final String TEST_TABLE_2 = "testtable2";
+ private static final String TEST_TABLE_3 = "testtable3";
private LocalHiveMetastoreTestUtils hiveMetastoreTestUtils;
private HiveSource hiveSource;
-
+ private File tmpDir;
@BeforeClass
- public void setup() throws Exception {
+ public void setup() {
this.hiveMetastoreTestUtils = LocalHiveMetastoreTestUtils.getInstance();
this.hiveSource = new HiveSource();
+ this.tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
}
@Test
public void testGetWorkUnitsForTable() throws Exception {
-
String dbName = "testdb2";
- String tableName = "testtable2";
- String tableSdLoc = "/tmp/testtable2";
+ String tableSdLoc = new File(this.tmpDir, TEST_TABLE_2).getAbsolutePath();
this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName,
false, true, true);
SourceState testState = getTestState(dbName);
- this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName,
tableSdLoc, Optional.<String> absent());
+ this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_2,
tableSdLoc, Optional.<String> absent());
List<WorkUnit> workUnits = hiveSource.getWorkunits(testState);
@@ -77,22 +82,20 @@ public class HiveSourceTest {
HiveWorkUnit hwu = new HiveWorkUnit(wu);
Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getDb(), dbName);
- Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(),
tableName);
+ Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(),
TEST_TABLE_2);
Assert.assertEquals(hwu.getTableSchemaUrl(), new Path("/tmp/dummy"));
}
@Test
public void testGetWorkUnitsForPartitions() throws Exception {
-
String dbName = "testdb3";
- String tableName = "testtable3";
- String tableSdLoc = "/tmp/testtable3";
+ String tableSdLoc = new File(this.tmpDir, TEST_TABLE_3).getAbsolutePath();
this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName,
false, true, true);
SourceState testState = getTestState(dbName);
- Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName,
tableName, tableSdLoc, Optional.of("field"));
+ Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName,
TEST_TABLE_3, tableSdLoc, Optional.of("field"));
this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"),
(int) System.currentTimeMillis());
@@ -103,7 +106,7 @@ public class HiveSourceTest {
WorkUnit wu = workUnits.get(0);
WorkUnit wu2 = workUnits.get(1);
- HiveWorkUnit hwu = null;
+ HiveWorkUnit hwu;
if (!wu.contains(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)) {
hwu = new HiveWorkUnit(wu);
} else {
@@ -111,29 +114,26 @@ public class HiveSourceTest {
}
Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getDb(), dbName);
- Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(),
tableName);
+ Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(),
TEST_TABLE_3);
Assert.assertEquals(hwu.getPartitionName().get(), "field=f1");
}
@Test
public void testGetWorkunitsAfterWatermark() throws Exception {
-
String dbName = "testdb4";
- String tableName1 = "testtable1";
- String tableSdLoc1 = "/tmp/testtable1";
- String tableName2 = "testtable2";
- String tableSdLoc2 = "/tmp/testtable2";
+ String tableSdLoc1 = new File(this.tmpDir, TEST_TABLE_1).getAbsolutePath();
+ String tableSdLoc2 = new File(this.tmpDir, TEST_TABLE_2).getAbsolutePath();
this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName,
false, true, true);
- this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName1,
tableSdLoc1, Optional.<String> absent());
- this.hiveMetastoreTestUtils.createTestAvroTable(dbName, tableName2,
tableSdLoc2, Optional.<String> absent(), true);
+ this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_1,
tableSdLoc1, Optional.<String> absent());
+ this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_2,
tableSdLoc2, Optional.<String> absent(), true);
List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList();
- Table table1 =
this.hiveMetastoreTestUtils.getLocalMetastoreClient().getTable(dbName,
tableName1);
+ Table table1 =
this.hiveMetastoreTestUtils.getLocalMetastoreClient().getTable(dbName,
TEST_TABLE_1);
- previousWorkUnitStates.add(ConversionHiveTestUtils.createWus(dbName,
tableName1,
+ previousWorkUnitStates.add(ConversionHiveTestUtils.createWus(dbName,
TEST_TABLE_1,
TimeUnit.MILLISECONDS.convert(table1.getCreateTime(),
TimeUnit.SECONDS)));
SourceState testState = new SourceState(getTestState(dbName),
previousWorkUnitStates);
@@ -147,12 +147,11 @@ public class HiveSourceTest {
HiveWorkUnit hwu = new HiveWorkUnit(wu);
Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getDb(), dbName);
- Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(),
tableName2);
+ Assert.assertEquals(hwu.getHiveDataset().getDbAndTable().getTable(),
TEST_TABLE_2);
}
@Test
public void testShouldCreateWorkunitsOlderThanLookback() throws Exception {
-
long currentTime = System.currentTimeMillis();
long partitionCreateTime = new
DateTime(currentTime).minusDays(35).getMillis();
@@ -170,7 +169,6 @@ public class HiveSourceTest {
@Test
public void testShouldCreateWorkunitsNewerThanLookback() throws Exception {
-
long currentTime = System.currentTimeMillis();
// Default lookback time is 3 days
long partitionCreateTime = new
DateTime(currentTime).minusDays(2).getMillis();
@@ -189,7 +187,6 @@ public class HiveSourceTest {
@Test
public void testIsOlderThanLookbackForDistcpGenerationTime() throws
Exception {
-
long currentTime = System.currentTimeMillis();
// Default lookback time is 3 days
long partitionCreateTime = new
DateTime(currentTime).minusDays(2).getMillis();
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
index 5d684b4..42fc935 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/LocalHiveMetastoreTestUtils.java
@@ -42,6 +42,7 @@ import org.apache.thrift.TException;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
@@ -62,6 +63,10 @@ public class LocalHiveMetastoreTestUtils {
} catch (IOException e) {
e.printStackTrace();
}
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ Properties p = System.getProperties();
+ p.setProperty("derby.system.home", tmpDir.getAbsolutePath());
this.localMetastoreClient =
HiveMetastoreClientPool.get(new Properties(),
Optional.<String>absent()).getClient().get();
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
index 893e13c..8c7d96c 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/converter/HiveAvroToOrcConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.data.management.conversion.hive.converter;
+import java.io.File;
import java.io.IOException;
import java.util.List;
@@ -32,6 +33,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -48,11 +50,15 @@ import
org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
@Test(groups = { "gobblin.data.management.conversion" })
public class HiveAvroToOrcConverterTest {
+ private static final String TEST_TABLE = "testtable";
private static String resourceDir = "hiveConverterTest";
private LocalHiveMetastoreTestUtils hiveMetastoreTestUtils;
+ private File tmpDir;
public HiveAvroToOrcConverterTest() {
+ this.tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
this.hiveMetastoreTestUtils = LocalHiveMetastoreTestUtils.getInstance();
}
@@ -63,21 +69,20 @@ public class HiveAvroToOrcConverterTest {
@Test
public void testFlattenSchemaDDLandDML() throws Exception {
String dbName = "testdb";
- String tableName = "testtable";
- String tableSdLoc = "/tmp/testtable";
+ String tableSdLoc = new File(this.tmpDir, TEST_TABLE).getAbsolutePath();
this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName,
false, true, true);
- Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName,
tableName, tableSdLoc, Optional.<String> absent());
+ Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName,
TEST_TABLE, tableSdLoc, Optional.<String> absent());
Schema schema =
ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir,
"recordWithinRecordWithinRecord_nested.json");
- WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName,
0);
+ WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, TEST_TABLE,
0);
try (HiveAvroToFlattenedOrcConverter converter = new
HiveAvroToFlattenedOrcConverter();) {
Config config = ConfigFactory.parseMap(
ImmutableMap.<String, String>builder().put("destinationFormats",
"flattenedOrc")
.put("flattenedOrc.destination.dbName", dbName)
- .put("flattenedOrc.destination.tableName", tableName + "_orc")
+ .put("flattenedOrc.destination.tableName", TEST_TABLE + "_orc")
.put("flattenedOrc.destination.dataPath", "file:" + tableSdLoc +
"_orc").build());
ConvertibleHiveDataset cd =
ConvertibleHiveDatasetTest.createTestConvertibleDataset(config);
@@ -115,14 +120,13 @@ public class HiveAvroToOrcConverterTest {
@Test
public void testNestedSchemaDDLandDML() throws Exception {
String dbName = "testdb";
- String tableName = "testtable";
- String tableSdLoc = "/tmp/testtable";
+ String tableSdLoc = new File(this.tmpDir, TEST_TABLE).getAbsolutePath();
this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName,
false, true, true);
- Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName,
tableName, tableSdLoc, Optional.<String> absent());
+ Table table = this.hiveMetastoreTestUtils.createTestAvroTable(dbName,
TEST_TABLE, tableSdLoc, Optional.<String> absent());
Schema schema =
ConversionHiveTestUtils.readSchemaFromJsonFile(resourceDir,
"recordWithinRecordWithinRecord_nested.json");
- WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, tableName,
0);
+ WorkUnitState wus = ConversionHiveTestUtils.createWus(dbName, TEST_TABLE,
0);
wus.getJobState().setProp("orc.table.flatten.schema", "false");
try (HiveAvroToNestedOrcConverter converter = new
HiveAvroToNestedOrcConverter();) {
@@ -131,7 +135,7 @@ public class HiveAvroToOrcConverterTest {
.put("destinationFormats", "nestedOrc")
.put("nestedOrc.destination.tableName","testtable_orc_nested")
.put("nestedOrc.destination.dbName",dbName)
-
.put("nestedOrc.destination.dataPath","file:/tmp/testtable_orc_nested")
+ .put("nestedOrc.destination.dataPath","file:" + tableSdLoc +
"_orc_nested")
.build());
ConvertibleHiveDataset cd =
ConvertibleHiveDatasetTest.createTestConvertibleDataset(config);
diff --git a/gobblin-data-management/src/test/resources/hive-site.xml
b/gobblin-data-management/src/test/resources/hive-site.xml
index ae1b4cd..0cd205d 100644
--- a/gobblin-data-management/src/test/resources/hive-site.xml
+++ b/gobblin-data-management/src/test/resources/hive-site.xml
@@ -4,10 +4,6 @@
<value>org.apache.derby.jdbc.EmbeddedDriver</value>
</property>
<property>
- <name>javax.jdo.option.ConnectionURL</name>
-
<value>jdbc:derby:;databaseName=/tmp/scratch/hive/metastore_db;create=true</value>
- </property>
- <property>
<name>hive.metastore.warehouse.dir</name>
<value>/tmp/scratch/hive/warehouse</value>
</property>
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 3bef426..e8c8c73 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -153,34 +153,42 @@ public class Kafka09DataWriter<K, V> implements
KafkaDataWriter<K, V> {
this.producer.flush();
}
- private void provisionTopic(String topicName,Config config) {
+ private void provisionTopic(String topicName, Config config) {
String zooKeeperPropKey = KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER;
- if(!config.hasPath(zooKeeperPropKey)) {
- log.debug("Topic "+topicName+" is configured without the partition and
replication");
- return;
+ if (!config.hasPath(zooKeeperPropKey)) {
+ log.debug("Topic " + topicName + " is configured without the partition
and replication");
+ return;
}
String zookeeperConnect = config.getString(zooKeeperPropKey);
- int sessionTimeoutMs = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT,
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
- int connectionTimeoutMs = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT,
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
+ int sessionTimeoutMs = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT,
+ KafkaWriterConfigurationKeys.ZOOKEEPER_SESSION_TIMEOUT_DEFAULT);
+ int connectionTimeoutMs = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT,
+ KafkaWriterConfigurationKeys.ZOOKEEPER_CONNECTION_TIMEOUT_DEFAULT);
// Note: You must initialize the ZkClient with ZKStringSerializer. If you
don't, then
// createTopic() will only seem to work (it will return without error).
The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka
itself does not create the
// topic.
- ZkClient zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs,
connectionTimeoutMs, ZKStringSerializer$.MODULE$);
+ ZkClient zkClient =
+ new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs,
ZKStringSerializer$.MODULE$);
// Security for Kafka was added in Kafka 0.9.0.0
ZkUtils zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect), false);
- int partitions = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.PARTITION_COUNT,
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
- int replication = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.REPLICATION_COUNT,
KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+ int partitions = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.PARTITION_COUNT,
+ KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
+ int replication = ConfigUtils.getInt(config,
KafkaWriterConfigurationKeys.REPLICATION_COUNT,
+ KafkaWriterConfigurationKeys.PARTITION_COUNT_DEFAULT);
Properties topicConfig = new Properties();
- if(AdminUtils.topicExists(zkUtils, topicName)) {
- log.debug("Topic"+topicName+" already Exists with replication:
"+replication+" and partitions :"+partitions);
- return;
- }
- try {
- AdminUtils.createTopic(zkUtils, topicName, partitions, replication,
topicConfig);
- } catch (RuntimeException e) {
- throw new RuntimeException(e);
- }
- log.info("Created Topic "+topicName+" with replication: "+replication+"
and partitions :"+partitions);
+ if (AdminUtils.topicExists(zkUtils, topicName)) {
+ log.debug("Topic {} already exists with replication: {} and partitions:
{}", topicName, replication, partitions);
+ boolean deleteTopicIfExists = ConfigUtils.getBoolean(config,
KafkaWriterConfigurationKeys.DELETE_TOPIC_IF_EXISTS,
+ KafkaWriterConfigurationKeys.DEFAULT_DELETE_TOPIC_IF_EXISTS);
+ if (!deleteTopicIfExists) {
+ return;
+ } else {
+ log.debug("Deleting topic {}", topicName);
+ AdminUtils.deleteTopic(zkUtils, topicName);
+ }
}
+ AdminUtils.createTopic(zkUtils, topicName, partitions, replication,
topicConfig);
+ log.info("Created topic {} with replication: {} and partitions : {}",
topicName, replication, partitions);
+ }
}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
index db66a85..b3fceac 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/source/extractor/extract/kafka/KafkaSimpleStreamingTest.java
@@ -140,7 +140,7 @@ public class KafkaSimpleStreamingTest {
* @throws InterruptedException
* @throws DataRecordException
*/
- @Test(timeOut = 10000)
+ @Test(timeOut = 30000)
public void testExtractor()
throws IOException, InterruptedException, DataRecordException {
final String topic = "testSimpleStreamingExtractor";
@@ -213,7 +213,7 @@ public class KafkaSimpleStreamingTest {
* original thread calls close on the extractor and verifies the waiting
thread gets an expected exception and exits
* as expected.
*/
- @Test(timeOut = 10000)
+ @Test(timeOut = 30000)
public void testThreadedExtractor() {
final String topic = "testThreadedExtractor";
final KafkaSimpleStreamingExtractor<String, byte[]> kSSE =
getStreamingExtractor(topic);
@@ -247,7 +247,7 @@ public class KafkaSimpleStreamingTest {
/**
* Test that the extractor barfs on not calling start
*/
- @Test(timeOut = 10000)
+ @Test(timeOut = 30000)
public void testExtractorStart() {
final String topic = "testExtractorStart";
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index a54a75c..8a28bf2 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import com.google.api.client.util.Lists;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.io.Closer;
import com.google.common.io.Files;
@@ -50,11 +51,9 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
-
@Test
@Slf4j
public class HighLevelConsumerTest extends KafkaTestBase {
-
private static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers";
private static final String KAFKA_AUTO_OFFSET_RESET_KEY =
"auto.offset.reset";
private static final String SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT =
AbstractBaseKafkaConsumerClient.CONFIG_NAMESPACE + "." +
AbstractBaseKafkaConsumerClient.CONSUMER_CONFIG + ".";
@@ -72,20 +71,25 @@ public class HighLevelConsumerTest extends KafkaTestBase {
}
@BeforeSuite
- public void beforeSuite() throws Exception {
+ public void beforeSuite()
+ throws Exception {
startServers();
_closer = Closer.create();
Properties producerProps = new Properties();
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_TOPIC, TOPIC);
-
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
-
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps
+ .setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ BOOTSTRAP_SERVERS_KEY, _kafkaBrokers);
+
producerProps.setProperty(KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX
+ + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG,
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.setProperty(KafkaWriterConfigurationKeys.CLUSTER_ZOOKEEPER,
this.getZkConnectString());
producerProps.setProperty(KafkaWriterConfigurationKeys.PARTITION_COUNT,
String.valueOf(NUM_PARTITIONS));
+
producerProps.setProperty(KafkaWriterConfigurationKeys.DELETE_TOPIC_IF_EXISTS,
String.valueOf(true));
AsyncDataWriter<byte[]> dataWriter = _closer.register(new
Kafka09DataWriter<byte[], byte[]>(producerProps));
List<byte[]> records = createByteArrayMessages();
WriteCallback mock = Mockito.mock(WriteCallback.class);
- for(byte[] record : records) {
+ for (byte[] record : records) {
dataWriter.write(record, mock);
}
dataWriter.flush();
@@ -114,12 +118,14 @@ public class HighLevelConsumerTest extends KafkaTestBase {
consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+ //Generate a brand new consumer group id to ensure there are no previously
committed offsets for this group id
+ String consumerGroupId = Joiner.on("-").join(TOPIC, "auto",
System.currentTimeMillis());
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY,
"true");
-
MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC,
ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS);
consumer.startAsync().awaitRunning();
- consumer.awaitExactlyNMessages(NUM_MSGS, 5000);
+ consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
consumer.shutDown();
}
@@ -129,7 +135,9 @@ public class HighLevelConsumerTest extends KafkaTestBase {
consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
-
+ //Generate a brand new consumer group id to ensure there are no previously
committed offsets for this group id
+ String consumerGroupId = Joiner.on("-").join(TOPIC, "manual",
System.currentTimeMillis());
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
// Setting this to a second to make sure we are committing offsets
frequently
consumerProps.put(HighLevelConsumer.OFFSET_COMMIT_TIME_THRESHOLD_SECS_KEY,
1);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index bb8681e..52af940 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -70,6 +70,8 @@ public class KafkaWriterConfigurationKeys {
static final int REPLICATION_COUNT_DEFAULT = 1;
public static final String PARTITION_COUNT = KAFKA_TOPIC_CONFIG +
"partitionCount";
static final int PARTITION_COUNT_DEFAULT = 1;
+ public static final String DELETE_TOPIC_IF_EXISTS = KAFKA_TOPIC_CONFIG +
"deleteTopicIfExists";
+ static final Boolean DEFAULT_DELETE_TOPIC_IF_EXISTS = false;
public static final String ZOOKEEPER_SESSION_TIMEOUT = CLUSTER_ZOOKEEPER +
".sto";
static final int ZOOKEEPER_SESSION_TIMEOUT_DEFAULT = 10000; // 10 seconds
public static final String ZOOKEEPER_CONNECTION_TIMEOUT = CLUSTER_ZOOKEEPER
+ ".cto";
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
index 58ae8d1..ae74f26 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
@@ -60,7 +60,7 @@ public class FlowConfigClient implements Closeable {
* @param serverUri address and port of the REST server
*/
public FlowConfigClient(String serverUri) {
- this(serverUri, Collections.<String, String>emptyMap());
+ this(serverUri, Collections.emptyMap());
}
public FlowConfigClient(String serverUri, Map<String, String> properties) {
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 98b9e90..27dd79a 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -77,7 +77,7 @@ public class FlowConfigV2Client implements Closeable {
LOG.debug("FlowConfigClient with serverUri " + serverUri);
_httpClientFactory = Optional.of(new HttpClientFactory());
- Client r2Client = new
TransportClientAdapter(_httpClientFactory.get().getClient(Collections.<String,
String>emptyMap()));
+ Client r2Client = new
TransportClientAdapter(_httpClientFactory.get().getClient(properties));
_restClient = Optional.of(new RestClient(r2Client, serverUri));
_flowconfigsV2RequestBuilders = createRequestBuilders();
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
index 520b354..36c0b6b 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/MockedHighLevelConsumer.java
@@ -17,13 +17,13 @@
package org.apache.gobblin.runtime.kafka;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import javax.annotation.Nullable;
@@ -38,13 +38,13 @@ import org.apache.gobblin.testing.AssertWithBackoff;
public class MockedHighLevelConsumer extends HighLevelConsumer<byte[], byte[]>
{
@Getter
- private final List<byte[]> messages;
+ private final BlockingQueue<byte[]> messages;
@Getter
private final Map<KafkaPartition, Long> committedOffsets;
public MockedHighLevelConsumer(String topic, Config config, int numThreads) {
super(topic, config, numThreads);
- this.messages = Lists.newArrayList();
+ this.messages = new LinkedBlockingQueue<>();
this.committedOffsets = new ConcurrentHashMap<>();
}
@@ -59,7 +59,7 @@ public class MockedHighLevelConsumer extends
HighLevelConsumer<byte[], byte[]> {
@Override
protected void processMessage(DecodeableKafkaRecord<byte[], byte[]> message)
{
- this.messages.add(message.getValue());
+ this.messages.offer(message.getValue());
}
@Override
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
index 4818b0a..0d2d849 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/RatedControlledFileSystemTest.java
@@ -32,6 +32,8 @@ import org.testng.annotations.Test;
import com.codahale.metrics.Meter;
import com.google.common.math.DoubleMath;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.util.limiter.Limiter;
import org.apache.gobblin.util.limiter.RateBasedLimiter;
@@ -39,6 +41,7 @@ import org.apache.gobblin.util.limiter.RateBasedLimiter;
/**
* Unit tests for {@link RatedControlledFileSystem}.
*/
+@Slf4j
@Test(groups = { "gobblin.util" })
public class RatedControlledFileSystemTest {
@@ -69,16 +72,15 @@ public class RatedControlledFileSystemTest {
}
@Test
- public void testFsOperation() throws IOException, InterruptedException {
+ public void testFsOperation() throws IOException {
Meter meter = new Meter();
Path fakePath = new Path("fakePath");
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 500; i++) {
Assert.assertFalse(this.rateControlledFs.exists(fakePath));
meter.mark();
- Thread.sleep((RANDOM.nextInt() & Integer.MAX_VALUE) % 10);
}
// Assert a fuzzy equal with 5% of tolerance
- Assert.assertTrue(DoubleMath.fuzzyEquals(meter.getMeanRate(), 20d, 20d *
0.05));
+ Assert.assertTrue(DoubleMath.fuzzyEquals(meter.getMeanRate(), 20d, 20d *
0.10));
}
@AfterClass