Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ef438c872 -> f1bc746ca


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
 
b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
new file mode 100644
index 0000000..3a238ad
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+/**
+ * An interface to log Exceptions
+ */
+public interface ExceptionLogger {
+
+  void log(Exception exception);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
 
b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
new file mode 100644
index 0000000..f592ffa
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.writer.GenericWriteResponse;
+import org.apache.gobblin.writer.WriteCallback;
+import org.apache.gobblin.writer.WriteResponse;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+import javax.annotation.Nullable;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A class to hold Futures and Callbacks to support Async writes
+ */
+@Slf4j
+public class FutureCallbackHolder {
+
+  @Getter
+  private final ActionListener<BulkResponse> actionListener;
+  private final BlockingQueue<Pair<WriteResponse, Throwable>> 
writeResponseQueue = new ArrayBlockingQueue<>(1);
+  @Getter
+  private final Future<WriteResponse> future;
+  private final AtomicBoolean done = new AtomicBoolean(false);
+
+  public FutureCallbackHolder(final @Nullable WriteCallback callback,
+      ExceptionLogger exceptionLogger,
+      final MalformedDocPolicy malformedDocPolicy) {
+    this.future = new Future<WriteResponse>() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return false;
+      }
+
+      @Override
+      public boolean isDone() {
+        return done.get();
+      }
+
+      @Override
+      public WriteResponse get()
+          throws InterruptedException, ExecutionException {
+        Pair<WriteResponse, Throwable> writeResponseThrowablePair = 
writeResponseQueue.take();
+        return getWriteResponseorThrow(writeResponseThrowablePair);
+      }
+
+      @Override
+      public WriteResponse get(long timeout, TimeUnit unit)
+          throws InterruptedException, ExecutionException, TimeoutException {
+        Pair<WriteResponse, Throwable> writeResponseThrowablePair = 
writeResponseQueue.poll(timeout, unit);
+        if (writeResponseThrowablePair == null) {
+          throw new TimeoutException("Timeout exceeded while waiting for 
future to be done");
+        } else {
+          return getWriteResponseorThrow(writeResponseThrowablePair);
+        }
+      }
+    };
+
+    this.actionListener = new ActionListener<BulkResponse>() {
+      @Override
+      public void onResponse(BulkResponse bulkItemResponses) {
+        if (bulkItemResponses.hasFailures()) {
+          boolean logicalErrors = false;
+          boolean serverErrors = false;
+          for (BulkItemResponse bulkItemResponse: bulkItemResponses) {
+            if (bulkItemResponse.isFailed()) {
+              // check if the failure is permanent (logical) or transient 
(server)
+              if (isLogicalError(bulkItemResponse)) {
+                // check error policy
+                switch (malformedDocPolicy) {
+                  case IGNORE: {
+                    log.debug("Document id {} was malformed with error {}",
+                        bulkItemResponse.getId(),
+                        bulkItemResponse.getFailureMessage());
+                    break;
+                  }
+                  case WARN: {
+                    log.warn("Document id {} was malformed with error {}",
+                        bulkItemResponse.getId(),
+                        bulkItemResponse.getFailureMessage());
+                    break;
+                  }
+                  default: {
+                    // Pass through
+                  }
+                }
+                logicalErrors = true;
+              } else {
+                serverErrors = true;
+              }
+            }
+          }
+          if (serverErrors) {
+            onFailure(new RuntimeException("Partial failures in the batch: " + 
bulkItemResponses.buildFailureMessage()));
+          } else if (logicalErrors) {
+            // all errors found were logical, throw RuntimeException if policy 
says to Fail
+            switch (malformedDocPolicy) {
+              case FAIL: {
+                onFailure(new RuntimeException("Partial non-recoverable 
failures in the batch. To ignore these, set "
+                    + 
ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY 
+ " to "
+                    + MalformedDocPolicy.IGNORE.name()));
+                break;
+              }
+              default: {
+                WriteResponse writeResponse = new 
GenericWriteResponse<BulkResponse>(bulkItemResponses);
+                writeResponseQueue.add(new Pair<WriteResponse, 
Throwable>(writeResponse, null));
+                if (callback != null) {
+                  callback.onSuccess(writeResponse);
+                }
+              }
+            }
+          }
+        } else {
+          WriteResponse writeResponse = new 
GenericWriteResponse<BulkResponse>(bulkItemResponses);
+          writeResponseQueue.add(new Pair<WriteResponse, 
Throwable>(writeResponse, null));
+          if (callback != null) {
+            callback.onSuccess(writeResponse);
+          }
+        }
+      }
+
+      private boolean isLogicalError(BulkItemResponse bulkItemResponse) {
+        String failureMessage = bulkItemResponse.getFailureMessage();
+        return failureMessage.contains("IllegalArgumentException")
+            || failureMessage.contains("illegal_argument_exception")
+            || failureMessage.contains("MapperParsingException")
+            || failureMessage.contains("mapper_parsing_exception");
+      }
+
+      @Override
+      public void onFailure(Exception exception) {
+        writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, 
exception));
+        if (exceptionLogger != null) {
+          exceptionLogger.log(exception);
+        }
+        if (callback != null) {
+          callback.onFailure(exception);
+        }
+      }
+    };
+  }
+
+
+  private WriteResponse getWriteResponseorThrow(Pair<WriteResponse, Throwable> 
writeResponseThrowablePair)
+      throws ExecutionException {
+    try {
+      if (writeResponseThrowablePair.getFirst() != null) {
+        return writeResponseThrowablePair.getFirst();
+      } else if (writeResponseThrowablePair.getSecond() != null) {
+        throw new ExecutionException(writeResponseThrowablePair.getSecond());
+      } else {
+        throw new ExecutionException(new RuntimeException("Could not find 
non-null WriteResponse pair"));
+      }
+    } finally {
+      done.set(true);
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
 
b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
new file mode 100644
index 0000000..4449d60
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+/**
+ * A class to represent different policies for handling malformed documents
+ */
+public enum MalformedDocPolicy {
+  IGNORE,  // Ignore on failure
+  WARN,    // Log warning on failure
+  FAIL     // Fail on failure
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
new file mode 100644
index 0000000..fb11d8d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.gobblin.test.TestUtils;
+import org.testng.Assert;
+
+import com.google.common.base.Throwables;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A Test ElasticSearch server
+ */
+@Slf4j
+@NotThreadSafe
+public class ElasticsearchTestServer {
+
+
+  private static final String ELASTICSEARCH_VERSION="5.6.8";
+  private static final String 
TEST_ROOT_DIR="gobblin-modules/gobblin-elasticsearch/test-elasticsearch/";
+  // The clean elasticsearch instance is installed here
+  private static final String BASE_ELASTICSEARCH_INSTALL =TEST_ROOT_DIR + 
"elasticsearch-" + ELASTICSEARCH_VERSION;
+  // Per-test elasticsearch instances are installed under a different directory
+  private static final String TEST_INSTALL_PREFIX =TEST_ROOT_DIR + 
"es-test-install-";
+  private static final String ELASTICSEARCH_BIN="/bin/elasticsearch";
+  private static final String ELASTICSEARCH_CONFIG_FILE= 
"/config/elasticsearch.yml";
+  private static final String ELASTICSEARCH_JVMOPTS_FILE="/config/jvm.options";
+  private final String _testId;
+  private final int _tcpPort;
+  private Process elasticProcess;
+  private final int _httpPort;
+  private String _pid = ManagementFactory.getRuntimeMXBean().getName();
+  private final String _testInstallDirectory;
+  private AtomicBoolean _started = new AtomicBoolean(false);
+
+  public ElasticsearchTestServer(String testId)
+      throws IOException {
+    this(testId, TestUtils.findFreePort(), TestUtils.findFreePort());
+  }
+
+  private ElasticsearchTestServer(String testId, int httpPort, int tcpPort)
+      throws IOException {
+    _testId = testId;
+    _httpPort = httpPort;
+    _tcpPort = tcpPort;
+    _testInstallDirectory = TEST_INSTALL_PREFIX + _testId;
+    try {
+      createInstallation();
+    }
+    catch (Exception e) {
+      throw new IOException("Failed to create a test installation of 
elasticsearch", e);
+    }
+    configure();
+  }
+
+  public ElasticsearchTestServer()
+      throws IOException {
+    this(TestUtils.generateRandomAlphaString(25));
+  }
+
+  private void createInstallation()
+      throws IOException {
+    File srcDir = new File(BASE_ELASTICSEARCH_INSTALL);
+    if (!srcDir.exists()) {
+      throw new IOException("Could not find base elasticsearch instance 
installed at " + srcDir.getAbsolutePath() + "\n"
+          + "Run ./gradlew 
:gobblin-modules:gobblin-elasticsearch:installTestDependencies before running 
this test");
+    }
+    File destDir = new File(_testInstallDirectory);
+    log.debug("About to recreate directory : {}", destDir.getPath());
+    if (destDir.exists()) {
+      org.apache.commons.io.FileUtils.deleteDirectory(destDir);
+    }
+
+    String[] commands = {"cp", "-r", srcDir.getAbsolutePath(), 
destDir.getAbsolutePath()};
+    try {
+      log.debug("{}: Will run command: {}", this._pid, 
Arrays.toString(commands));
+      Process copyProcess = new 
ProcessBuilder().inheritIO().command(commands).start();
+      copyProcess.waitFor();
+    } catch (Exception e) {
+      log.error("Failed to create installation directory at {}", 
destDir.getPath(), e);
+      Throwables.propagate(e);
+    }
+  }
+
+
+
+
+  private void configure() throws IOException {
+    File configFile = new File(_testInstallDirectory + 
ELASTICSEARCH_CONFIG_FILE);
+    FileOutputStream configFileStream = new FileOutputStream(configFile);
+    try {
+      configFileStream.write(("cluster.name: " + _testId + 
"\n").getBytes("UTF-8"));
+      configFileStream.write(("http.port: " + _httpPort + 
"\n").getBytes("UTF-8"));
+      configFileStream.write(("transport.tcp.port: " + _tcpPort + 
"\n").getBytes("UTF-8"));
+    }
+    finally {
+      configFileStream.close();
+    }
+
+    File jvmConfigFile = new File(_testInstallDirectory + 
ELASTICSEARCH_JVMOPTS_FILE);
+    try (Stream<String> lines = Files.lines(jvmConfigFile.toPath())) {
+      List<String> newLines = lines.map(line -> 
line.replaceAll("^\\s*(-Xm[s,x]).*$", "$1128m"))
+      .collect(Collectors.toList());
+      Files.write(jvmConfigFile.toPath(), newLines);
+    }
+  }
+
+  public void start(int maxStartupTimeSeconds)
+  {
+    if (_started.get()) {
+      log.warn("ElasticSearch server has already been attempted to be 
started... returning without doing anything");
+      return;
+    }
+    _started.set(true);
+
+    log.error("{}: Starting elasticsearch server on port {}", this._pid, 
this._httpPort);
+    String[] commands = {_testInstallDirectory + ELASTICSEARCH_BIN};
+
+    try {
+      log.error("{}: Will run command: {}", this._pid, 
Arrays.toString(commands));
+      elasticProcess = new 
ProcessBuilder().inheritIO().command(commands).start();
+      if (elasticProcess != null) {
+        // register destroy of process on shutdown in-case of unclean test 
termination
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+          public void run() {
+            if (elasticProcess!=null) {
+              elasticProcess.destroy();
+            }
+          }
+        });
+      }
+    } catch (Exception e) {
+      log.error("Failed to start elasticsearch server", e);
+      Throwables.propagate(e);
+    }
+
+    boolean isUp = false;
+    int numTries = maxStartupTimeSeconds * 2;
+    while (!isUp && numTries-- > 0) {
+      try {
+        Thread.sleep(500); // wait 1/2 second
+        isUp = isUp();
+      } catch (Exception e) {
+
+      }
+    }
+    Assert.assertTrue(isUp, "Server is not up!");
+  }
+
+
+  public boolean isUp()
+  {
+    try {
+      URL url = new URL("http://localhost:"; + _httpPort + 
"/_cluster/health?wait_for_status=green");
+      long startTime = System.nanoTime();
+      HttpURLConnection httpURLConnection = (HttpURLConnection) 
url.openConnection();
+      int responseCode = httpURLConnection.getResponseCode();
+      log.info("Duration: {} seconds, Response code = {}",
+          (System.nanoTime() - startTime) / 1000000000.0,
+          responseCode);
+      if (responseCode == 200) { return true; } else {return false;}
+    }
+    catch (Exception e) {
+      Throwables.propagate(e);
+      return false;
+    }
+  }
+
+  public int getTransportPort() {
+    return _tcpPort;
+  }
+
+
+  public int getHttpPort() { return _httpPort; }
+
+
+  public void stop() {
+    if (elasticProcess != null) {
+      try {
+        elasticProcess.destroy();
+        elasticProcess = null; // set to null to prevent redundant call to 
destroy on shutdown
+      } catch (Exception e) {
+        log.warn("Failed to stop the ElasticSearch server", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
new file mode 100644
index 0000000..dc3294d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch;
+
+import java.io.IOException;
+
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+
+/**
+ * A Test to test that the {@link ElasticsearchTestServer} class does what it 
is supposed to do
+ */
+public class ElasticsearchTestServerTest {
+
+
+  ElasticsearchTestServer _elasticsearchTestServer;
+
+  @BeforeSuite
+  public void startServer()
+      throws IOException {
+    _elasticsearchTestServer = new ElasticsearchTestServer();
+    _elasticsearchTestServer.start(60);
+  }
+  @Test
+  public void testServerStart()
+      throws InterruptedException, IOException {
+      _elasticsearchTestServer.start(60); // second start should be a no-op
+  }
+
+  @AfterSuite
+  public void stopServer() {
+    _elasticsearchTestServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
new file mode 100644
index 0000000..f12528d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.util.Properties;
+
+import 
org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Setter;
+import lombok.experimental.Accessors;
+
+
+/**
+ * A helper class to build Config for Elasticsearch Writers
+ */
+@Accessors(chain=true)
+public class ConfigBuilder {
+  @Setter
+  String indexName;
+  @Setter
+  String indexType;
+  @Setter
+  int httpPort;
+  @Setter
+  int transportPort;
+  @Setter
+  boolean idMappingEnabled = true;
+  @Setter
+  String clientType = "REST";
+  @Setter
+  String typeMapperClassName;
+  @Setter
+  MalformedDocPolicy malformedDocPolicy;
+
+  Config build() {
+    Properties props = new Properties();
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE,
 clientType);
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME,
 indexName);
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE,
 indexType);
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED,
+        "" + idMappingEnabled);
+    if (this.clientType.equalsIgnoreCase("rest")) {
+      
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS,
 "localhost:" + httpPort);
+    } else if (this.clientType.equalsIgnoreCase("transport")) {
+      
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS,
 "localhost:" + transportPort);
+    } else throw new RuntimeException("Client type needs to be one of 
rest/transport");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
 typeMapperClassName);
+    if (malformedDocPolicy != null) {
+      
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY,
+          malformedDocPolicy.toString().toUpperCase());
+    }
+    return ConfigFactory.parseProperties(props);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
new file mode 100644
index 0000000..46ae680
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+import 
org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchTransportClientWriterTest {
+
+  @Test
+  public void testBadSslConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME,
 "test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE,
 "test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED,
 "true");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED,
 "true");
+    Config config = ConfigFactory.parseProperties(props);
+    try {
+      new ElasticsearchTransportClientWriter(config);
+      Assert.fail("Writer should not be constructed");
+    }
+    catch (Exception e) {
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
new file mode 100644
index 0000000..341cd76
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.net.UnknownHostException;
+import java.util.Properties;
+
+import 
org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchWriterBaseTest {
+
+  public static ElasticsearchWriterBase getWriterBase(Config config)
+      throws UnknownHostException {
+    return new ElasticsearchWriterBase(config) {
+      @Override
+      int getDefaultPort() {
+        return 0;
+      }
+    };
+  }
+
+  private void assertFailsToConstruct(Properties props, String testScenario) {
+    assertConstructionExpectation(props, testScenario, false);
+  }
+
+  private void assertSucceedsToConstruct(Properties props, String 
testScenario) {
+    assertConstructionExpectation(props, testScenario, true);
+  }
+
+  private void assertConstructionExpectation(Properties props,
+      String testScenario,
+      Boolean constructionSuccess) {
+    Config config = ConfigFactory.parseProperties(props);
+    try {
+      ElasticsearchWriterBase writer = getWriterBase(config);
+      if (!constructionSuccess) {
+        Assert.fail("Test Scenario: " + testScenario + ": Writer should not be 
constructed");
+      }
+    }
+    catch (Exception e) {
+      if (constructionSuccess) {
+        Assert.fail("Test Scenario: " + testScenario + ": Writer should be 
constructed successfully");
+      }
+    }
+  }
+
+  @Test
+  public void testMinimalRequiredConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME,
 "test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE,
 "test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertSucceedsToConstruct(props, "minimal configuration");
+  }
+
+  @Test
+  public void testBadIndexNameConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE,
 "test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertFailsToConstruct(props, "index name missing");
+  }
+
+
+
+  @Test
+  public void testBadIndexNameCasingConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME,
 "Test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertFailsToConstruct(props, "bad index name casing");
+  }
+
+  @Test
+  public void testBadIndexTypeConfiguration()
+      throws UnknownHostException {
+    Properties props = new Properties();
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME,
 "test");
+    
props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS,
+        AvroGenericRecordTypeMapper.class.getCanonicalName());
+    assertFailsToConstruct(props, "no index type provided");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
new file mode 100644
index 0000000..746171c
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.elasticsearch.ElasticsearchTestServer;
+import org.apache.gobblin.test.AvroRecordGenerator;
+import org.apache.gobblin.test.JsonRecordGenerator;
+import org.apache.gobblin.test.PayloadType;
+import org.apache.gobblin.test.RecordTypeGenerator;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.gobblin.writer.AsyncWriterManager;
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.gobblin.writer.BufferedAsyncDataWriter;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.SequentialBasedBatchAccumulator;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class ElasticsearchWriterIntegrationTest {
+
+
+  private ElasticsearchTestServer _esTestServer;
+  private String pid = ManagementFactory.getRuntimeMXBean().getName();
+
+  private List<WriterVariant> variants;
+  private List<RecordTypeGenerator> recordGenerators;
+
+  ElasticsearchWriterIntegrationTest() {
+    variants = ImmutableList.of(new RestWriterVariant(),
+        new TransportWriterVariant());
+    recordGenerators = ImmutableList.of(new AvroRecordGenerator(), new 
JsonRecordGenerator());
+  }
+
+  @BeforeSuite
+  public void startServers()
+      throws IOException {
+    log.error("{}: Starting Elasticsearch Server", pid);
+    _esTestServer = new ElasticsearchTestServer();
+    _esTestServer.start(60);
+  }
+
+  @AfterSuite
+  public void stopServers() {
+    log.error("{}: Stopping Elasticsearch Server", pid);
+    _esTestServer.stop();
+  }
+
+
+  @Test
+  public void testSingleRecordWrite()
+      throws IOException {
+
+    for (WriterVariant writerVariant : variants) {
+      for (RecordTypeGenerator recordVariant : recordGenerators) {
+
+        String indexName = "posts" + writerVariant.getName().toLowerCase();
+        String indexType = recordVariant.getName();
+        Config config = writerVariant.getConfigBuilder()
+            .setIndexName(indexName)
+            .setIndexType(indexType)
+            .setTypeMapperClassName(recordVariant.getTypeMapperClassName())
+            .setHttpPort(_esTestServer.getHttpPort())
+            .setTransportPort(_esTestServer.getTransportPort())
+            .build();
+
+        TestClient testClient = writerVariant.getTestClient(config);
+        SequentialBasedBatchAccumulator<Object> batchAccumulator = new 
SequentialBasedBatchAccumulator<>(config);
+        BufferedAsyncDataWriter bufferedAsyncDataWriter = new 
BufferedAsyncDataWriter(batchAccumulator, 
writerVariant.getBatchAsyncDataWriter(config));
+
+
+        String id = TestUtils.generateRandomAlphaString(10);
+        Object testRecord = recordVariant.getRecord(id, PayloadType.STRING);
+
+        DataWriter writer = 
AsyncWriterManager.builder().failureAllowanceRatio(0.0).retriesEnabled(false).config(config)
+            .asyncDataWriter(bufferedAsyncDataWriter).build();
+
+        try {
+          testClient.recreateIndex(indexName);
+          writer.write(testRecord);
+          writer.commit();
+        } finally {
+          writer.close();
+        }
+
+        try {
+          GetResponse response = testClient.get(new GetRequest(indexName, 
indexType, id));
+          Assert.assertEquals(response.getId(), id, "Response id matches 
request");
+          Assert.assertEquals(response.isExists(), true, "Document not found");
+        } catch (Exception e) {
+          Assert.fail("Failed to get a response", e);
+        } finally {
+          testClient.close();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testMalformedDocCombinations()
+      throws IOException {
+    for (WriterVariant writerVariant : variants) {
+      for (RecordTypeGenerator recordVariant : recordGenerators) {
+        for (MalformedDocPolicy policy : MalformedDocPolicy.values()) {
+          testMalformedDocs(writerVariant, recordVariant, policy);
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Sends two docs in a single batch with different field types
+   * Triggers Elasticsearch server to send back an exception due to malformed 
docs
+   * @throws IOException
+   */
+  public void testMalformedDocs(WriterVariant writerVariant, 
RecordTypeGenerator recordVariant, MalformedDocPolicy malformedDocPolicy)
+      throws IOException {
+
+    String indexName = writerVariant.getName().toLowerCase();
+    String indexType = 
(recordVariant.getName()+malformedDocPolicy.name()).toLowerCase();
+    Config config = writerVariant.getConfigBuilder()
+        .setIdMappingEnabled(true)
+        .setIndexName(indexName)
+        .setIndexType(indexType)
+        .setHttpPort(_esTestServer.getHttpPort())
+        .setTransportPort(_esTestServer.getTransportPort())
+        .setTypeMapperClassName(recordVariant.getTypeMapperClassName())
+        .setMalformedDocPolicy(malformedDocPolicy)
+        .build();
+
+
+    TestClient testClient = writerVariant.getTestClient(config);
+    testClient.recreateIndex(indexName);
+
+    String id1=TestUtils.generateRandomAlphaString(10);
+    String id2=TestUtils.generateRandomAlphaString(10);
+
+    Object testRecord1 = recordVariant.getRecord(id1, PayloadType.LONG);
+    Object testRecord2 = recordVariant.getRecord(id2, PayloadType.MAP);
+
+    SequentialBasedBatchAccumulator<Object> batchAccumulator = new 
SequentialBasedBatchAccumulator<>(config);
+    BatchAsyncDataWriter elasticsearchWriter = 
writerVariant.getBatchAsyncDataWriter(config);
+    BufferedAsyncDataWriter bufferedAsyncDataWriter = new 
BufferedAsyncDataWriter(batchAccumulator, elasticsearchWriter);
+
+
+    DataWriter writer = AsyncWriterManager.builder()
+        .failureAllowanceRatio(0.0)
+        .retriesEnabled(false)
+        .config(config)
+        .asyncDataWriter(bufferedAsyncDataWriter)
+        .build();
+
+    try {
+      writer.write(testRecord1);
+      writer.write(testRecord2);
+      writer.commit();
+      writer.close();
+      if (malformedDocPolicy == MalformedDocPolicy.FAIL) {
+        Assert.fail("Should have thrown an exception if malformed doc policy 
was set to Fail");
+      }
+    }
+    catch (Exception e) {
+      switch (malformedDocPolicy) {
+        case IGNORE:case WARN:{
+          Assert.fail("Should not have failed if malformed doc policy was set 
to ignore or warn", e);
+          break;
+        }
+        case FAIL: {
+          // pass through
+          break;
+        }
+        default: {
+          throw new RuntimeException("This test does not handle this 
policyType : " + malformedDocPolicy.toString());
+        }
+      }
+    }
+
+    // Irrespective of policy, first doc should be inserted and second doc 
should fail
+    int docsIndexed = 0;
+    try {
+      {
+        GetResponse response = testClient.get(new GetRequest(indexName, 
indexType, id1));
+        Assert.assertEquals(response.getId(), id1, "Response id matches 
request");
+        System.out.println(malformedDocPolicy + ":" + response.toString());
+        if (response.isExists()) {
+          docsIndexed++;
+        }
+      }
+      {
+        GetResponse response = testClient.get(new GetRequest(indexName, 
indexType, id2));
+        Assert.assertEquals(response.getId(), id2, "Response id matches 
request");
+        System.out.println(malformedDocPolicy + ":" + response.toString());
+        if (response.isExists()) {
+          docsIndexed++;
+        }
+      }
+      // only one doc should be found
+      Assert.assertEquals(docsIndexed, 1, "Only one document should be 
indexed");
+    }
+    catch (Exception e) {
+      Assert.fail("Failed to get a response", e);
+    }
+    finally {
+      testClient.close();
+    }
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
new file mode 100644
index 0000000..94d8e56
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.testng.Assert;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A variant that uses the {@link ElasticsearchRestWriter}
+ */
+public class RestWriterVariant implements WriterVariant {
+
+  private ElasticsearchRestWriter _restWriter;
+  @Override
+  public String getName() {
+    return "rest";
+  }
+
+  @Override
+  public ConfigBuilder getConfigBuilder() {
+    return new ConfigBuilder()
+        .setClientType("REST");
+  }
+
+  @Override
+  public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config)
+      throws IOException {
+    _restWriter = new ElasticsearchRestWriter(config);
+    return _restWriter;
+  }
+
+  @Override
+  public TestClient getTestClient(Config config)
+      throws IOException {
+    final ElasticsearchRestWriter restWriter = new 
ElasticsearchRestWriter(config);
+    final RestHighLevelClient highLevelClient = 
restWriter.getRestHighLevelClient();
+    return new TestClient() {
+      @Override
+      public GetResponse get(GetRequest getRequest)
+          throws IOException {
+        return highLevelClient.get(getRequest);
+      }
+
+      @Override
+      public void recreateIndex(String indexName)
+          throws IOException {
+        RestClient restClient = restWriter.getRestLowLevelClient();
+        try {
+          restClient.performRequest("DELETE", "/" + indexName);
+        } catch (Exception e) {
+          // ok since index may not exist
+        }
+
+        String indexSettings = "{\"settings\" : 
{\"index\":{\"number_of_shards\":1,\"number_of_replicas\":1}}}";
+        HttpEntity entity = new StringEntity(indexSettings, 
ContentType.APPLICATION_JSON);
+
+        Response putResponse = restClient.performRequest("PUT", "/" + 
indexName, Collections.emptyMap(), entity);
+        Assert.assertEquals(putResponse.getStatusLine().getStatusCode(),200, 
"Recreate index succeeded");
+      }
+
+      @Override
+      public void close()
+          throws IOException {
+        restWriter.close();
+
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
new file mode 100644
index 0000000..31f08b1
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+
+
+/**
+ * An interface to describe a functional Elasticsearch client to aid in 
verification
+ * of test results
+ */
+
+public interface TestClient extends Closeable {
+  GetResponse get(GetRequest getRequest)
+      throws IOException;
+
+  void recreateIndex(String indexName)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
new file mode 100644
index 0000000..eb28c9a
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.testng.Assert;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * A variant that uses the {@link ElasticsearchTransportClientWriter}
+ */
+public class TransportWriterVariant implements WriterVariant {
+  @Override
+  public String getName() {
+    return "transport";
+  }
+
+  @Override
+  public ConfigBuilder getConfigBuilder() {
+    return new ConfigBuilder()
+        .setClientType("transport");
+  }
+
+  @Override
+  public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config)
+      throws IOException {
+    ElasticsearchTransportClientWriter transportClientWriter = new 
ElasticsearchTransportClientWriter(config);
+    return transportClientWriter;
+  }
+
+  @Override
+  public TestClient getTestClient(Config config)
+      throws IOException {
+    final ElasticsearchTransportClientWriter transportClientWriter = new 
ElasticsearchTransportClientWriter(config);
+    final TransportClient transportClient = 
transportClientWriter.getTransportClient();
+    return new TestClient() {
+      @Override
+      public GetResponse get(GetRequest getRequest)
+          throws IOException {
+        try {
+          return transportClient.get(getRequest).get();
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+
+      @Override
+      public void recreateIndex(String indexName)
+          throws IOException {
+        DeleteIndexRequestBuilder dirBuilder = 
transportClient.admin().indices().prepareDelete(indexName);
+        try {
+          DeleteIndexResponse diResponse = dirBuilder.execute().actionGet();
+        } catch (IndexNotFoundException ie) {
+          System.out.println("Index not found... that's ok");
+        }
+
+        CreateIndexRequestBuilder cirBuilder = 
transportClient.admin().indices().prepareCreate(indexName);
+        CreateIndexResponse ciResponse = cirBuilder.execute().actionGet();
+        Assert.assertTrue(ciResponse.isAcknowledged(), "Create index 
succeeeded");
+      }
+
+      @Override
+      public void close()
+          throws IOException {
+        transportClientWriter.close();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
new file mode 100644
index 0000000..581ec2e
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.elasticsearch.writer;
+
+import java.io.IOException;
+
+import org.apache.gobblin.writer.BatchAsyncDataWriter;
+
+import com.typesafe.config.Config;
+
+
+/**
+ * An interface to implement Writer variants to enable generic testing
+ */
+public interface WriterVariant {
+
+  String getName();
+
+  ConfigBuilder getConfigBuilder();
+
+  BatchAsyncDataWriter getBatchAsyncDataWriter(Config config)
+      throws IOException;
+
+  TestClient getTestClient(Config config)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
new file mode 100644
index 0000000..29433f3
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Collections;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import 
org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper;
+
+
+/**
+ * A generator of Avro records of type {@link GenericRecord}
+ */
+public class AvroRecordGenerator implements RecordTypeGenerator<GenericRecord> 
{
+  @Override
+  public String getName() {
+    return "avro";
+  }
+
+  @Override
+  public String getTypeMapperClassName() {
+    return AvroGenericRecordTypeMapper.class.getCanonicalName();
+  }
+
+  @Override
+  public GenericRecord getRecord(String id, PayloadType payloadType) {
+    GenericRecord record = getTestAvroRecord(id, payloadType);
+    return record;
+  }
+
+  static GenericRecord getTestAvroRecord(String identifier, PayloadType 
payloadType) {
+    Schema dataRecordSchema =
+        
SchemaBuilder.record("Data").fields().name("data").type().bytesType().noDefault().name("flags").type().intType()
+            .noDefault().endRecord();
+
+    Schema schema;
+    Object payloadValue;
+    switch (payloadType) {
+      case STRING: {
+        schema = SchemaBuilder.record("TestRecord").fields()
+            .name("id").type().stringType().noDefault()
+            .name("key").type().stringType().noDefault()
+            .name("data").type(dataRecordSchema).noDefault()
+            .endRecord();
+        payloadValue = TestUtils.generateRandomAlphaString(20);
+        break;
+      }
+      case LONG: {
+        schema = SchemaBuilder.record("TestRecord").fields()
+            .name("id").type().stringType().noDefault()
+            .name("key").type().longType().noDefault()
+            .name("data").type(dataRecordSchema).noDefault()
+            .endRecord();
+        payloadValue = TestUtils.generateRandomLong();
+        break;
+      }
+      case MAP: {
+        schema = SchemaBuilder.record("TestRecord").fields()
+            .name("id").type().stringType().noDefault()
+            .name("key").type().map().values().stringType().noDefault()
+            .name("data").type(dataRecordSchema).noDefault()
+            .endRecord();
+        payloadValue = Collections.EMPTY_MAP;
+        break;
+      }
+      default: {
+        throw new RuntimeException("Do not know how to handle this time");
+      }
+    }
+
+    GenericData.Record testRecord = new GenericData.Record(schema);
+
+    String testContent = "hello world";
+
+    GenericData.Record dataRecord = new GenericData.Record(dataRecordSchema);
+    dataRecord.put("data", 
ByteBuffer.wrap(testContent.getBytes(Charset.forName("UTF-8"))));
+    dataRecord.put("flags", 0);
+
+    testRecord.put("key", payloadValue);
+    testRecord.put("data", dataRecord);
+    testRecord.put("id", identifier);
+    return testRecord;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
new file mode 100644
index 0000000..7aea277
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+import java.util.Collections;
+
+import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+
+/**
+ * A generator of {@link JsonElement} records
+ */
+public class JsonRecordGenerator implements RecordTypeGenerator<JsonElement> {
+  private final Gson gson = new Gson();
+
+  @Override
+  public String getName() {
+    return "json";
+  }
+
+  @Override
+  public String getTypeMapperClassName() {
+    return JsonTypeMapper.class.getCanonicalName();
+  }
+
+  static class TestObject<T> {
+    private String id;
+    private T key;
+
+    TestObject(String id, T payload) {
+      this.id = id;
+      this.key = payload;
+    }
+  }
+
+  @Override
+  public JsonElement getRecord(String id, PayloadType payloadType) {
+    Object testObject;
+    switch (payloadType) {
+      case STRING: {
+        testObject = new TestObject(id, 
TestUtils.generateRandomAlphaString(20));
+        break;
+      }
+      case LONG: {
+        testObject = new TestObject(id, TestUtils.generateRandomLong());
+        break;
+      }
+      case MAP: {
+        testObject = new TestObject(id, Collections.EMPTY_MAP);
+        break;
+      }
+      default:
+        throw new RuntimeException("Do not know how to handle this type of 
payload");
+    }
+    JsonElement jsonElement = gson.toJsonTree(testObject);
+    return jsonElement;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
new file mode 100644
index 0000000..d793c47
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+/**
+ * An enumeration of Payload types
+ * Used to configure the record in tests
+ */
+public enum PayloadType {
+  STRING,
+  LONG,
+  MAP
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
new file mode 100644
index 0000000..00c798f
--- /dev/null
+++ 
b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.test;
+
+/**
+ * An interface to describe a generator of records
+ */
+public interface RecordTypeGenerator<T> {
+  /**
+   * The name of this record type
+   * @return
+   */
+  String getName();
+
+  /**
+   * A {@link org.apache.gobblin.elasticsearch.typemapping.TypeMapper} that 
can work with
+   * records of this type
+   * @return
+   */
+  String getTypeMapperClassName();
+
+  /**
+   * Generate a record with the provided characteristics
+   * @param identifier
+   * @param payloadType
+   * @return a record of the type T
+   */
+  T getRecord(String identifier, PayloadType payloadType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
 
b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
index ab4bffa..9db3782 100644
--- 
a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
+++ 
b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.eventhub.writer;
 import java.io.IOException;
 
 import org.apache.gobblin.writer.BytesBoundedBatch;
+import org.apache.gobblin.writer.LargeMessagePolicy;
+import org.apache.gobblin.writer.RecordTooLargeException;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -28,41 +30,44 @@ import org.apache.gobblin.writer.WriteCallback;
 public class EventhubBatchTest {
 
   @Test
-  public void testBatchWithLargeRecord() throws IOException {
+  public void testBatchWithLargeRecord()
+      throws IOException, RecordTooLargeException {
     // Assume memory size has only 2 bytes
     BytesBoundedBatch batch = new BytesBoundedBatch(8, 3000);
 
     String record = "abcdefgh";
 
     // Record is larger than the memory size limit, the first append should 
fail
-    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, 
LargeMessagePolicy.DROP));
 
     // The second append should still fail
-    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, 
LargeMessagePolicy.DROP));
   }
 
   @Test
-  public void testBatch() throws IOException {
+  public void testBatch()
+      throws IOException, RecordTooLargeException {
     // Assume memory size has only 200 bytes
     BytesBoundedBatch batch = new BytesBoundedBatch(200, 3000);
 
     // Add additional 15 bytes overhead, total size is 27 bytes
     String record = "abcdefgh";
 
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    LargeMessagePolicy policy = LargeMessagePolicy.DROP;
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
 
     // Batch has room for 8th record
-    Assert.assertEquals(batch.hasRoom(record), true);
-    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertEquals(batch.hasRoom(record, policy), true);
+    Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
 
     // Batch has no room for 9th record
-    Assert.assertEquals(batch.hasRoom(record), false);
-    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY));
+    Assert.assertEquals(batch.hasRoom(record, policy), false);
+    Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, policy));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java 
b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
index a583198..68c79e9 100644
--- a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
+++ b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
@@ -45,6 +45,27 @@ public class TestUtils {
     return messageBytes;
   }
 
+  private static final char[] alphas = new char[26];
+
+  public static Long generateRandomLong() {
+    return rng.nextLong();
+  }
+
+  static {
+    char ch = 'a';
+    for (int i = 0; i < 26; i++) {
+      alphas[i] = ch++;
+    }
+  }
+
+  public static String generateRandomAlphaString(int stringLength) {
+    char[] newString = new char[stringLength];
+    for (int i = 0; i < stringLength; ++i)
+    {
+      newString[i] = alphas[rng.nextInt(26)];
+    }
+    return new String(newString);
+  }
 
   /**
    * TODO: Currently generates a static schema avro record.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gradle/scripts/globalDependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/globalDependencies.gradle 
b/gradle/scripts/globalDependencies.gradle
index d1d2e03..d64db67 100644
--- a/gradle/scripts/globalDependencies.gradle
+++ b/gradle/scripts/globalDependencies.gradle
@@ -23,20 +23,22 @@ subprojects {
     configurations {
       compile
       dependencies {
-        compile(externalDependency.hadoopCommon) {
-          exclude module: 'servlet-api'
-        }
-        compile externalDependency.hadoopClientCore
-        compile externalDependency.hadoopAnnotations
-        if (project.name.equals('gobblin-runtime') || 
project.name.equals('gobblin-test')) {
-          compile externalDependency.hadoopClientCommon
-        }
-        compile(externalDependency.guava) {
-          force = true
-        }
-        compile(externalDependency.commonsCodec) {
-          force = true
-        }
+        if (!project.name.contains('gobblin-elasticsearch-deps')) {
+          compile(externalDependency.hadoopCommon) {
+            exclude module: 'servlet-api'
+          }
+          compile externalDependency.hadoopClientCore
+          compile externalDependency.hadoopAnnotations
+          if (project.name.equals('gobblin-runtime') || 
project.name.equals('gobblin-test')) {
+            compile externalDependency.hadoopClientCommon
+          }
+          compile(externalDependency.guava) {
+              force = true
+          }
+      }
+          compile(externalDependency.commonsCodec) {
+            force = true
+          }
 
         // Required to add JDK's tool jar, which is required to run byteman 
tests.
         testCompile (files(((URLClassLoader) 
ToolProvider.getSystemToolClassLoader()).getURLs()))

Reply via email to