Repository: incubator-gobblin Updated Branches: refs/heads/master ef438c872 -> f1bc746ca
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java new file mode 100644 index 0000000..3a238ad --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +/** + * An interface to log Exceptions + */ +public interface ExceptionLogger { + + void log(Exception exception); + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java new file mode 100644 index 0000000..f592ffa --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.writer.GenericWriteResponse; +import org.apache.gobblin.writer.WriteCallback; +import org.apache.gobblin.writer.WriteResponse; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; + +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * A class to hold Futures and Callbacks to support Async writes + */ +@Slf4j +public class FutureCallbackHolder { + + @Getter + private final ActionListener<BulkResponse> actionListener; + private final BlockingQueue<Pair<WriteResponse, Throwable>> writeResponseQueue = new ArrayBlockingQueue<>(1); + @Getter + private final Future<WriteResponse> future; + private final AtomicBoolean done = new AtomicBoolean(false); + + public FutureCallbackHolder(final @Nullable WriteCallback callback, + ExceptionLogger exceptionLogger, + final MalformedDocPolicy malformedDocPolicy) { + this.future = new Future<WriteResponse>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return done.get(); + } + + @Override + public WriteResponse get() + throws InterruptedException, ExecutionException { + Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.take(); + return getWriteResponseorThrow(writeResponseThrowablePair); + } + + @Override + public WriteResponse get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit); + if (writeResponseThrowablePair == null) { + throw new TimeoutException("Timeout exceeded while waiting for future to be done"); + } else { + return getWriteResponseorThrow(writeResponseThrowablePair); + } + } + }; + + this.actionListener = new ActionListener<BulkResponse>() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + boolean logicalErrors = false; + boolean serverErrors = false; + for (BulkItemResponse bulkItemResponse: bulkItemResponses) { + if (bulkItemResponse.isFailed()) { + // check if the failure is permanent (logical) or transient (server) + if (isLogicalError(bulkItemResponse)) { + // check error policy + switch (malformedDocPolicy) { + case IGNORE: { + log.debug("Document id {} was malformed with error {}", + bulkItemResponse.getId(), + bulkItemResponse.getFailureMessage()); + break; + } + case WARN: { + log.warn("Document id {} was malformed with error {}", + bulkItemResponse.getId(), + bulkItemResponse.getFailureMessage()); + break; + } + default: { + // Pass through + } + } + logicalErrors = true; + } else { + serverErrors = true; + } + } + } + if (serverErrors) { + onFailure(new RuntimeException("Partial failures in the batch: " + bulkItemResponses.buildFailureMessage())); + } else if (logicalErrors) { + // all errors found were logical, throw RuntimeException if policy says to Fail + switch (malformedDocPolicy) { + case FAIL: { + onFailure(new RuntimeException("Partial non-recoverable failures in the batch. To ignore these, set " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY + " to " + + MalformedDocPolicy.IGNORE.name())); + break; + } + default: { + WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses); + writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null)); + if (callback != null) { + callback.onSuccess(writeResponse); + } + } + } + } + } else { + WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses); + writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null)); + if (callback != null) { + callback.onSuccess(writeResponse); + } + } + } + + private boolean isLogicalError(BulkItemResponse bulkItemResponse) { + String failureMessage = bulkItemResponse.getFailureMessage(); + return failureMessage.contains("IllegalArgumentException") + || failureMessage.contains("illegal_argument_exception") + || failureMessage.contains("MapperParsingException") + || failureMessage.contains("mapper_parsing_exception"); + } + + @Override + public void onFailure(Exception exception) { + writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, exception)); + if (exceptionLogger != null) { + exceptionLogger.log(exception); + } + if (callback != null) { + callback.onFailure(exception); + } + } + }; + } + + + private WriteResponse getWriteResponseorThrow(Pair<WriteResponse, Throwable> writeResponseThrowablePair) + throws ExecutionException { + try { + if (writeResponseThrowablePair.getFirst() != null) { + return writeResponseThrowablePair.getFirst(); + } else if (writeResponseThrowablePair.getSecond() != null) { + throw new ExecutionException(writeResponseThrowablePair.getSecond()); + } else { + throw new ExecutionException(new RuntimeException("Could not find non-null WriteResponse pair")); + } + } finally { + done.set(true); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java new file mode 100644 index 0000000..4449d60 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +/** + * A class to represent different policies for handling malformed documents + */ +public enum MalformedDocPolicy { + IGNORE, // Ignore on failure + WARN, // Log warning on failure + FAIL // Fail on failure +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java new file mode 100644 index 0000000..fb11d8d --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.gobblin.test.TestUtils; +import org.testng.Assert; + +import com.google.common.base.Throwables; + +import javax.annotation.concurrent.NotThreadSafe; +import lombok.extern.slf4j.Slf4j; + + +/** + * A Test ElasticSearch server + */ +@Slf4j +@NotThreadSafe +public class ElasticsearchTestServer { + + + private static final String ELASTICSEARCH_VERSION="5.6.8"; + private static final String TEST_ROOT_DIR="gobblin-modules/gobblin-elasticsearch/test-elasticsearch/"; + // The clean elasticsearch instance is installed here + private static final String BASE_ELASTICSEARCH_INSTALL =TEST_ROOT_DIR + "elasticsearch-" + ELASTICSEARCH_VERSION; + // Per-test elasticsearch instances are installed under a different directory + private static final String TEST_INSTALL_PREFIX =TEST_ROOT_DIR + "es-test-install-"; + private static final String ELASTICSEARCH_BIN="/bin/elasticsearch"; + private static final String ELASTICSEARCH_CONFIG_FILE= "/config/elasticsearch.yml"; + private static final String ELASTICSEARCH_JVMOPTS_FILE="/config/jvm.options"; + private final String _testId; + private final int _tcpPort; + private Process elasticProcess; + private final int _httpPort; + private String _pid = ManagementFactory.getRuntimeMXBean().getName(); + private final String _testInstallDirectory; + private AtomicBoolean _started = new AtomicBoolean(false); + + public ElasticsearchTestServer(String testId) + throws IOException { + this(testId, TestUtils.findFreePort(), TestUtils.findFreePort()); + } + + private ElasticsearchTestServer(String testId, int httpPort, int tcpPort) + throws IOException { + _testId = testId; + _httpPort = httpPort; + _tcpPort = tcpPort; + _testInstallDirectory = TEST_INSTALL_PREFIX + _testId; + try { + createInstallation(); + } + catch (Exception e) { + throw new IOException("Failed to create a test installation of elasticsearch", e); + } + configure(); + } + + public ElasticsearchTestServer() + throws IOException { + this(TestUtils.generateRandomAlphaString(25)); + } + + private void createInstallation() + throws IOException { + File srcDir = new File(BASE_ELASTICSEARCH_INSTALL); + if (!srcDir.exists()) { + throw new IOException("Could not find base elasticsearch instance installed at " + srcDir.getAbsolutePath() + "\n" + + "Run ./gradlew :gobblin-modules:gobblin-elasticsearch:installTestDependencies before running this test"); + } + File destDir = new File(_testInstallDirectory); + log.debug("About to recreate directory : {}", destDir.getPath()); + if (destDir.exists()) { + org.apache.commons.io.FileUtils.deleteDirectory(destDir); + } + + String[] commands = {"cp", "-r", srcDir.getAbsolutePath(), destDir.getAbsolutePath()}; + try { + log.debug("{}: Will run command: {}", this._pid, Arrays.toString(commands)); + Process copyProcess = new ProcessBuilder().inheritIO().command(commands).start(); + copyProcess.waitFor(); + } catch (Exception e) { + log.error("Failed to create installation directory at {}", destDir.getPath(), e); + Throwables.propagate(e); + } + } + + + + + private void configure() throws IOException { + File configFile = new File(_testInstallDirectory + ELASTICSEARCH_CONFIG_FILE); + FileOutputStream configFileStream = new FileOutputStream(configFile); + try { + configFileStream.write(("cluster.name: " + _testId + "\n").getBytes("UTF-8")); + configFileStream.write(("http.port: " + _httpPort + "\n").getBytes("UTF-8")); + configFileStream.write(("transport.tcp.port: " + _tcpPort + "\n").getBytes("UTF-8")); + } + finally { + configFileStream.close(); + } + + File jvmConfigFile = new File(_testInstallDirectory + ELASTICSEARCH_JVMOPTS_FILE); + try (Stream<String> lines = Files.lines(jvmConfigFile.toPath())) { + List<String> newLines = lines.map(line -> line.replaceAll("^\\s*(-Xm[s,x]).*$", "$1128m")) + .collect(Collectors.toList()); + Files.write(jvmConfigFile.toPath(), newLines); + } + } + + public void start(int maxStartupTimeSeconds) + { + if (_started.get()) { + log.warn("ElasticSearch server has already been attempted to be started... returning without doing anything"); + return; + } + _started.set(true); + + log.error("{}: Starting elasticsearch server on port {}", this._pid, this._httpPort); + String[] commands = {_testInstallDirectory + ELASTICSEARCH_BIN}; + + try { + log.error("{}: Will run command: {}", this._pid, Arrays.toString(commands)); + elasticProcess = new ProcessBuilder().inheritIO().command(commands).start(); + if (elasticProcess != null) { + // register destroy of process on shutdown in-case of unclean test termination + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + if (elasticProcess!=null) { + elasticProcess.destroy(); + } + } + }); + } + } catch (Exception e) { + log.error("Failed to start elasticsearch server", e); + Throwables.propagate(e); + } + + boolean isUp = false; + int numTries = maxStartupTimeSeconds * 2; + while (!isUp && numTries-- > 0) { + try { + Thread.sleep(500); // wait 1/2 second + isUp = isUp(); + } catch (Exception e) { + + } + } + Assert.assertTrue(isUp, "Server is not up!"); + } + + + public boolean isUp() + { + try { + URL url = new URL("http://localhost:" + _httpPort + "/_cluster/health?wait_for_status=green"); + long startTime = System.nanoTime(); + HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); + int responseCode = httpURLConnection.getResponseCode(); + log.info("Duration: {} seconds, Response code = {}", + (System.nanoTime() - startTime) / 1000000000.0, + responseCode); + if (responseCode == 200) { return true; } else {return false;} + } + catch (Exception e) { + Throwables.propagate(e); + return false; + } + } + + public int getTransportPort() { + return _tcpPort; + } + + + public int getHttpPort() { return _httpPort; } + + + public void stop() { + if (elasticProcess != null) { + try { + elasticProcess.destroy(); + elasticProcess = null; // set to null to prevent redundant call to destroy on shutdown + } catch (Exception e) { + log.warn("Failed to stop the ElasticSearch server", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java new file mode 100644 index 0000000..dc3294d --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch; + +import java.io.IOException; + +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + + +/** + * A Test to test that the {@link ElasticsearchTestServer} class does what it is supposed to do + */ +public class ElasticsearchTestServerTest { + + + ElasticsearchTestServer _elasticsearchTestServer; + + @BeforeSuite + public void startServer() + throws IOException { + _elasticsearchTestServer = new ElasticsearchTestServer(); + _elasticsearchTestServer.start(60); + } + @Test + public void testServerStart() + throws InterruptedException, IOException { + _elasticsearchTestServer.start(60); // second start should be a no-op + } + + @AfterSuite + public void stopServer() { + _elasticsearchTestServer.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java new file mode 100644 index 0000000..f12528d --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.util.Properties; + +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; +import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.Setter; +import lombok.experimental.Accessors; + + +/** + * A helper class to build Config for Elasticsearch Writers + */ +@Accessors(chain=true) +public class ConfigBuilder { + @Setter + String indexName; + @Setter + String indexType; + @Setter + int httpPort; + @Setter + int transportPort; + @Setter + boolean idMappingEnabled = true; + @Setter + String clientType = "REST"; + @Setter + String typeMapperClassName; + @Setter + MalformedDocPolicy malformedDocPolicy; + + Config build() { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE, clientType); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, indexName); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, indexType); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, + "" + idMappingEnabled); + if (this.clientType.equalsIgnoreCase("rest")) { + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS, "localhost:" + httpPort); + } else if (this.clientType.equalsIgnoreCase("transport")) { + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS, "localhost:" + transportPort); + } else throw new RuntimeException("Client type needs to be one of rest/transport"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, typeMapperClassName); + if (malformedDocPolicy != null) { + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY, + malformedDocPolicy.toString().toUpperCase()); + } + return ConfigFactory.parseProperties(props); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java new file mode 100644 index 0000000..46ae680 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.net.UnknownHostException; +import java.util.Properties; + +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchTransportClientWriterTest { + + @Test + public void testBadSslConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, "true"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, "true"); + Config config = ConfigFactory.parseProperties(props); + try { + new ElasticsearchTransportClientWriter(config); + Assert.fail("Writer should not be constructed"); + } + catch (Exception e) { + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java new file mode 100644 index 0000000..341cd76 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.net.UnknownHostException; +import java.util.Properties; + +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchWriterBaseTest { + + public static ElasticsearchWriterBase getWriterBase(Config config) + throws UnknownHostException { + return new ElasticsearchWriterBase(config) { + @Override + int getDefaultPort() { + return 0; + } + }; + } + + private void assertFailsToConstruct(Properties props, String testScenario) { + assertConstructionExpectation(props, testScenario, false); + } + + private void assertSucceedsToConstruct(Properties props, String testScenario) { + assertConstructionExpectation(props, testScenario, true); + } + + private void assertConstructionExpectation(Properties props, + String testScenario, + Boolean constructionSuccess) { + Config config = ConfigFactory.parseProperties(props); + try { + ElasticsearchWriterBase writer = getWriterBase(config); + if (!constructionSuccess) { + Assert.fail("Test Scenario: " + testScenario + ": Writer should not be constructed"); + } + } + catch (Exception e) { + if (constructionSuccess) { + Assert.fail("Test Scenario: " + testScenario + ": Writer should be constructed successfully"); + } + } + } + + @Test + public void testMinimalRequiredConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertSucceedsToConstruct(props, "minimal configuration"); + } + + @Test + public void testBadIndexNameConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertFailsToConstruct(props, "index name missing"); + } + + + + @Test + public void testBadIndexNameCasingConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "Test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertFailsToConstruct(props, "bad index name casing"); + } + + @Test + public void testBadIndexTypeConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertFailsToConstruct(props, "no index type provided"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java new file mode 100644 index 0000000..746171c --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.elasticsearch.ElasticsearchTestServer; +import org.apache.gobblin.test.AvroRecordGenerator; +import org.apache.gobblin.test.JsonRecordGenerator; +import org.apache.gobblin.test.PayloadType; +import org.apache.gobblin.test.RecordTypeGenerator; +import org.apache.gobblin.test.TestUtils; +import org.apache.gobblin.writer.AsyncWriterManager; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.BufferedAsyncDataWriter; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.SequentialBasedBatchAccumulator; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchWriterIntegrationTest { + + + private ElasticsearchTestServer _esTestServer; + private String pid = ManagementFactory.getRuntimeMXBean().getName(); + + private List<WriterVariant> variants; + private List<RecordTypeGenerator> recordGenerators; + + ElasticsearchWriterIntegrationTest() { + variants = ImmutableList.of(new RestWriterVariant(), + new TransportWriterVariant()); + recordGenerators = ImmutableList.of(new AvroRecordGenerator(), new JsonRecordGenerator()); + } + + @BeforeSuite + public void startServers() + throws IOException { + log.error("{}: Starting Elasticsearch Server", pid); + _esTestServer = new ElasticsearchTestServer(); + _esTestServer.start(60); + } + + @AfterSuite + public void stopServers() { + log.error("{}: Stopping Elasticsearch Server", pid); + _esTestServer.stop(); + } + + + @Test + public void testSingleRecordWrite() + throws IOException { + + for (WriterVariant writerVariant : variants) { + for (RecordTypeGenerator recordVariant : recordGenerators) { + + String indexName = "posts" + writerVariant.getName().toLowerCase(); + String indexType = recordVariant.getName(); + Config config = writerVariant.getConfigBuilder() + .setIndexName(indexName) + .setIndexType(indexType) + .setTypeMapperClassName(recordVariant.getTypeMapperClassName()) + .setHttpPort(_esTestServer.getHttpPort()) + .setTransportPort(_esTestServer.getTransportPort()) + .build(); + + TestClient testClient = writerVariant.getTestClient(config); + SequentialBasedBatchAccumulator<Object> batchAccumulator = new SequentialBasedBatchAccumulator<>(config); + BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, writerVariant.getBatchAsyncDataWriter(config)); + + + String id = TestUtils.generateRandomAlphaString(10); + Object testRecord = recordVariant.getRecord(id, PayloadType.STRING); + + DataWriter writer = AsyncWriterManager.builder().failureAllowanceRatio(0.0).retriesEnabled(false).config(config) + .asyncDataWriter(bufferedAsyncDataWriter).build(); + + try { + testClient.recreateIndex(indexName); + writer.write(testRecord); + writer.commit(); + } finally { + writer.close(); + } + + try { + GetResponse response = testClient.get(new GetRequest(indexName, indexType, id)); + Assert.assertEquals(response.getId(), id, "Response id matches request"); + Assert.assertEquals(response.isExists(), true, "Document not found"); + } catch (Exception e) { + Assert.fail("Failed to get a response", e); + } finally { + testClient.close(); + } + } + } + } + + @Test + public void testMalformedDocCombinations() + throws IOException { + for (WriterVariant writerVariant : variants) { + for (RecordTypeGenerator recordVariant : recordGenerators) { + for (MalformedDocPolicy policy : MalformedDocPolicy.values()) { + testMalformedDocs(writerVariant, recordVariant, policy); + } + } + } + } + + + + /** + * Sends two docs in a single batch with different field types + * Triggers Elasticsearch server to send back an exception due to malformed docs + * @throws IOException + */ + public void testMalformedDocs(WriterVariant writerVariant, RecordTypeGenerator recordVariant, MalformedDocPolicy malformedDocPolicy) + throws IOException { + + String indexName = writerVariant.getName().toLowerCase(); + String indexType = (recordVariant.getName()+malformedDocPolicy.name()).toLowerCase(); + Config config = writerVariant.getConfigBuilder() + .setIdMappingEnabled(true) + .setIndexName(indexName) + .setIndexType(indexType) + .setHttpPort(_esTestServer.getHttpPort()) + .setTransportPort(_esTestServer.getTransportPort()) + .setTypeMapperClassName(recordVariant.getTypeMapperClassName()) + .setMalformedDocPolicy(malformedDocPolicy) + .build(); + + + TestClient testClient = writerVariant.getTestClient(config); + testClient.recreateIndex(indexName); + + String id1=TestUtils.generateRandomAlphaString(10); + String id2=TestUtils.generateRandomAlphaString(10); + + Object testRecord1 = recordVariant.getRecord(id1, PayloadType.LONG); + Object testRecord2 = recordVariant.getRecord(id2, PayloadType.MAP); + + SequentialBasedBatchAccumulator<Object> batchAccumulator = new SequentialBasedBatchAccumulator<>(config); + BatchAsyncDataWriter elasticsearchWriter = writerVariant.getBatchAsyncDataWriter(config); + BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, elasticsearchWriter); + + + DataWriter writer = AsyncWriterManager.builder() + .failureAllowanceRatio(0.0) + .retriesEnabled(false) + .config(config) + .asyncDataWriter(bufferedAsyncDataWriter) + .build(); + + try { + writer.write(testRecord1); + writer.write(testRecord2); + writer.commit(); + writer.close(); + if (malformedDocPolicy == MalformedDocPolicy.FAIL) { + Assert.fail("Should have thrown an exception if malformed doc policy was set to Fail"); + } + } + catch (Exception e) { + switch (malformedDocPolicy) { + case IGNORE:case WARN:{ + Assert.fail("Should not have failed if malformed doc policy was set to ignore or warn", e); + break; + } + case FAIL: { + // pass through + break; + } + default: { + throw new RuntimeException("This test does not handle this policyType : " + malformedDocPolicy.toString()); + } + } + } + + // Irrespective of policy, first doc should be inserted and second doc should fail + int docsIndexed = 0; + try { + { + GetResponse response = testClient.get(new GetRequest(indexName, indexType, id1)); + Assert.assertEquals(response.getId(), id1, "Response id matches request"); + System.out.println(malformedDocPolicy + ":" + response.toString()); + if (response.isExists()) { + docsIndexed++; + } + } + { + GetResponse response = testClient.get(new GetRequest(indexName, indexType, id2)); + Assert.assertEquals(response.getId(), id2, "Response id matches request"); + System.out.println(malformedDocPolicy + ":" + response.toString()); + if (response.isExists()) { + docsIndexed++; + } + } + // only one doc should be found + Assert.assertEquals(docsIndexed, 1, "Only one document should be indexed"); + } + catch (Exception e) { + Assert.fail("Failed to get a response", e); + } + finally { + testClient.close(); + } + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java new file mode 100644 index 0000000..94d8e56 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.testng.Assert; + +import com.typesafe.config.Config; + + +/** + * A variant that uses the {@link ElasticsearchRestWriter} + */ +public class RestWriterVariant implements WriterVariant { + + private ElasticsearchRestWriter _restWriter; + @Override + public String getName() { + return "rest"; + } + + @Override + public ConfigBuilder getConfigBuilder() { + return new ConfigBuilder() + .setClientType("REST"); + } + + @Override + public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config) + throws IOException { + _restWriter = new ElasticsearchRestWriter(config); + return _restWriter; + } + + @Override + public TestClient getTestClient(Config config) + throws IOException { + final ElasticsearchRestWriter restWriter = new ElasticsearchRestWriter(config); + final RestHighLevelClient highLevelClient = restWriter.getRestHighLevelClient(); + return new TestClient() { + @Override + public GetResponse get(GetRequest getRequest) + throws IOException { + return highLevelClient.get(getRequest); + } + + @Override + public void recreateIndex(String indexName) + throws IOException { + RestClient restClient = restWriter.getRestLowLevelClient(); + try { + restClient.performRequest("DELETE", "/" + indexName); + } catch (Exception e) { + // ok since index may not exist + } + + String indexSettings = "{\"settings\" : {\"index\":{\"number_of_shards\":1,\"number_of_replicas\":1}}}"; + HttpEntity entity = new StringEntity(indexSettings, ContentType.APPLICATION_JSON); + + Response putResponse = restClient.performRequest("PUT", "/" + indexName, Collections.emptyMap(), entity); + Assert.assertEquals(putResponse.getStatusLine().getStatusCode(),200, "Recreate index succeeded"); + } + + @Override + public void close() + throws IOException { + restWriter.close(); + + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java new file mode 100644 index 0000000..31f08b1 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.Closeable; +import java.io.IOException; + +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; + + +/** + * An interface to describe a functional Elasticsearch client to aid in verification + * of test results + */ + +public interface TestClient extends Closeable { + GetResponse get(GetRequest getRequest) + throws IOException; + + void recreateIndex(String indexName) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java new file mode 100644 index 0000000..eb28c9a --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; + +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.testng.Assert; + +import com.typesafe.config.Config; + + +/** + * A variant that uses the {@link ElasticsearchTransportClientWriter} + */ +public class TransportWriterVariant implements WriterVariant { + @Override + public String getName() { + return "transport"; + } + + @Override + public ConfigBuilder getConfigBuilder() { + return new ConfigBuilder() + .setClientType("transport"); + } + + @Override + public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config) + throws IOException { + ElasticsearchTransportClientWriter transportClientWriter = new ElasticsearchTransportClientWriter(config); + return transportClientWriter; + } + + @Override + public TestClient getTestClient(Config config) + throws IOException { + final ElasticsearchTransportClientWriter transportClientWriter = new ElasticsearchTransportClientWriter(config); + final TransportClient transportClient = transportClientWriter.getTransportClient(); + return new TestClient() { + @Override + public GetResponse get(GetRequest getRequest) + throws IOException { + try { + return transportClient.get(getRequest).get(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void recreateIndex(String indexName) + throws IOException { + DeleteIndexRequestBuilder dirBuilder = transportClient.admin().indices().prepareDelete(indexName); + try { + DeleteIndexResponse diResponse = dirBuilder.execute().actionGet(); + } catch (IndexNotFoundException ie) { + System.out.println("Index not found... that's ok"); + } + + CreateIndexRequestBuilder cirBuilder = transportClient.admin().indices().prepareCreate(indexName); + CreateIndexResponse ciResponse = cirBuilder.execute().actionGet(); + Assert.assertTrue(ciResponse.isAcknowledged(), "Create index succeeeded"); + } + + @Override + public void close() + throws IOException { + transportClientWriter.close(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java new file mode 100644 index 0000000..581ec2e --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; + +import org.apache.gobblin.writer.BatchAsyncDataWriter; + +import com.typesafe.config.Config; + + +/** + * An interface to implement Writer variants to enable generic testing + */ +public interface WriterVariant { + + String getName(); + + ConfigBuilder getConfigBuilder(); + + BatchAsyncDataWriter getBatchAsyncDataWriter(Config config) + throws IOException; + + TestClient getTestClient(Config config) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java new file mode 100644 index 0000000..29433f3 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Collections; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; + + +/** + * A generator of Avro records of type {@link GenericRecord} + */ +public class AvroRecordGenerator implements RecordTypeGenerator<GenericRecord> { + @Override + public String getName() { + return "avro"; + } + + @Override + public String getTypeMapperClassName() { + return AvroGenericRecordTypeMapper.class.getCanonicalName(); + } + + @Override + public GenericRecord getRecord(String id, PayloadType payloadType) { + GenericRecord record = getTestAvroRecord(id, payloadType); + return record; + } + + static GenericRecord getTestAvroRecord(String identifier, PayloadType payloadType) { + Schema dataRecordSchema = + SchemaBuilder.record("Data").fields().name("data").type().bytesType().noDefault().name("flags").type().intType() + .noDefault().endRecord(); + + Schema schema; + Object payloadValue; + switch (payloadType) { + case STRING: { + schema = SchemaBuilder.record("TestRecord").fields() + .name("id").type().stringType().noDefault() + .name("key").type().stringType().noDefault() + .name("data").type(dataRecordSchema).noDefault() + .endRecord(); + payloadValue = TestUtils.generateRandomAlphaString(20); + break; + } + case LONG: { + schema = SchemaBuilder.record("TestRecord").fields() + .name("id").type().stringType().noDefault() + .name("key").type().longType().noDefault() + .name("data").type(dataRecordSchema).noDefault() + .endRecord(); + payloadValue = TestUtils.generateRandomLong(); + break; + } + case MAP: { + schema = SchemaBuilder.record("TestRecord").fields() + .name("id").type().stringType().noDefault() + .name("key").type().map().values().stringType().noDefault() + .name("data").type(dataRecordSchema).noDefault() + .endRecord(); + payloadValue = Collections.EMPTY_MAP; + break; + } + default: { + throw new RuntimeException("Do not know how to handle this time"); + } + } + + GenericData.Record testRecord = new GenericData.Record(schema); + + String testContent = "hello world"; + + GenericData.Record dataRecord = new GenericData.Record(dataRecordSchema); + dataRecord.put("data", ByteBuffer.wrap(testContent.getBytes(Charset.forName("UTF-8")))); + dataRecord.put("flags", 0); + + testRecord.put("key", payloadValue); + testRecord.put("data", dataRecord); + testRecord.put("id", identifier); + return testRecord; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java new file mode 100644 index 0000000..7aea277 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +import java.util.Collections; + +import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; + + +/** + * A generator of {@link JsonElement} records + */ +public class JsonRecordGenerator implements RecordTypeGenerator<JsonElement> { + private final Gson gson = new Gson(); + + @Override + public String getName() { + return "json"; + } + + @Override + public String getTypeMapperClassName() { + return JsonTypeMapper.class.getCanonicalName(); + } + + static class TestObject<T> { + private String id; + private T key; + + TestObject(String id, T payload) { + this.id = id; + this.key = payload; + } + } + + @Override + public JsonElement getRecord(String id, PayloadType payloadType) { + Object testObject; + switch (payloadType) { + case STRING: { + testObject = new TestObject(id, TestUtils.generateRandomAlphaString(20)); + break; + } + case LONG: { + testObject = new TestObject(id, TestUtils.generateRandomLong()); + break; + } + case MAP: { + testObject = new TestObject(id, Collections.EMPTY_MAP); + break; + } + default: + throw new RuntimeException("Do not know how to handle this type of payload"); + } + JsonElement jsonElement = gson.toJsonTree(testObject); + return jsonElement; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java new file mode 100644 index 0000000..d793c47 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +/** + * An enumeration of Payload types + * Used to configure the record in tests + */ +public enum PayloadType { + STRING, + LONG, + MAP +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java new file mode 100644 index 0000000..00c798f --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +/** + * An interface to describe a generator of records + */ +public interface RecordTypeGenerator<T> { + /** + * The name of this record type + * @return + */ + String getName(); + + /** + * A {@link org.apache.gobblin.elasticsearch.typemapping.TypeMapper} that can work with + * records of this type + * @return + */ + String getTypeMapperClassName(); + + /** + * Generate a record with the provided characteristics + * @param identifier + * @param payloadType + * @return a record of the type T + */ + T getRecord(String identifier, PayloadType payloadType); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java index ab4bffa..9db3782 100644 --- a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java +++ b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java @@ -19,6 +19,8 @@ package org.apache.gobblin.eventhub.writer; import java.io.IOException; import org.apache.gobblin.writer.BytesBoundedBatch; +import org.apache.gobblin.writer.LargeMessagePolicy; +import org.apache.gobblin.writer.RecordTooLargeException; import org.testng.Assert; import org.testng.annotations.Test; @@ -28,41 +30,44 @@ import org.apache.gobblin.writer.WriteCallback; public class EventhubBatchTest { @Test - public void testBatchWithLargeRecord() throws IOException { + public void testBatchWithLargeRecord() + throws IOException, RecordTooLargeException { // Assume memory size has only 2 bytes BytesBoundedBatch batch = new BytesBoundedBatch(8, 3000); String record = "abcdefgh"; // Record is larger than the memory size limit, the first append should fail - Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, LargeMessagePolicy.DROP)); // The second append should still fail - Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, LargeMessagePolicy.DROP)); } @Test - public void testBatch() throws IOException { + public void testBatch() + throws IOException, RecordTooLargeException { // Assume memory size has only 200 bytes BytesBoundedBatch batch = new BytesBoundedBatch(200, 3000); // Add additional 15 bytes overhead, total size is 27 bytes String record = "abcdefgh"; - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); + LargeMessagePolicy policy = LargeMessagePolicy.DROP; + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); // Batch has room for 8th record - Assert.assertEquals(batch.hasRoom(record), true); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertEquals(batch.hasRoom(record, policy), true); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); // Batch has no room for 9th record - Assert.assertEquals(batch.hasRoom(record), false); - Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertEquals(batch.hasRoom(record, policy), false); + Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java index a583198..68c79e9 100644 --- a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java +++ b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java @@ -45,6 +45,27 @@ public class TestUtils { return messageBytes; } + private static final char[] alphas = new char[26]; + + public static Long generateRandomLong() { + return rng.nextLong(); + } + + static { + char ch = 'a'; + for (int i = 0; i < 26; i++) { + alphas[i] = ch++; + } + } + + public static String generateRandomAlphaString(int stringLength) { + char[] newString = new char[stringLength]; + for (int i = 0; i < stringLength; ++i) + { + newString[i] = alphas[rng.nextInt(26)]; + } + return new String(newString); + } /** * TODO: Currently generates a static schema avro record. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gradle/scripts/globalDependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/globalDependencies.gradle b/gradle/scripts/globalDependencies.gradle index d1d2e03..d64db67 100644 --- a/gradle/scripts/globalDependencies.gradle +++ b/gradle/scripts/globalDependencies.gradle @@ -23,20 +23,22 @@ subprojects { configurations { compile dependencies { - compile(externalDependency.hadoopCommon) { - exclude module: 'servlet-api' - } - compile externalDependency.hadoopClientCore - compile externalDependency.hadoopAnnotations - if (project.name.equals('gobblin-runtime') || project.name.equals('gobblin-test')) { - compile externalDependency.hadoopClientCommon - } - compile(externalDependency.guava) { - force = true - } - compile(externalDependency.commonsCodec) { - force = true - } + if (!project.name.contains('gobblin-elasticsearch-deps')) { + compile(externalDependency.hadoopCommon) { + exclude module: 'servlet-api' + } + compile externalDependency.hadoopClientCore + compile externalDependency.hadoopAnnotations + if (project.name.equals('gobblin-runtime') || project.name.equals('gobblin-test')) { + compile externalDependency.hadoopClientCommon + } + compile(externalDependency.guava) { + force = true + } + } + compile(externalDependency.commonsCodec) { + force = true + } // Required to add JDK's tool jar, which is required to run byteman tests. testCompile (files(((URLClassLoader) ToolProvider.getSystemToolClassLoader()).getURLs()))
