[GOBBLIN-17] Add Elasticsearch writer (rest + transport) Closes #2419 from shirshanka/elastic
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f1bc746c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f1bc746c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f1bc746c Branch: refs/heads/master Commit: f1bc746ca50cffa1247c00b6c5bdd34b7321198d Parents: ef438c8 Author: Shirshanka Das <[email protected]> Authored: Fri Aug 31 22:31:21 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Fri Aug 31 22:31:45 2018 -0700 ---------------------------------------------------------------------- .../gobblin/writer/AsyncWriterManager.java | 3 + .../java/org/apache/gobblin/writer/Batch.java | 22 +- .../gobblin/writer/BufferedAsyncDataWriter.java | 4 +- .../gobblin/writer/BytesBoundedBatch.java | 10 +- .../gobblin/writer/LargeMessagePolicy.java | 26 ++ .../gobblin/writer/RecordTooLargeException.java | 20 ++ .../writer/SequentialBasedBatchAccumulator.java | 65 +++-- .../gobblin-flavor-standard.gradle | 1 + .../src/main/resources/wikipedia-elastic.conf | 64 +++++ .../gobblin-elasticsearch-deps/build.gradle | 50 ++++ .../gobblin-elasticsearch/build.gradle | 76 ++++++ .../scripts/install_test_deps.sh | 40 +++ .../scripts/uninstall_test_deps.sh | 23 ++ .../AvroGenericRecordSerializer.java | 80 ++++++ .../AvroGenericRecordTypeMapper.java | 71 ++++++ .../typemapping/FieldMappingException.java | 35 +++ .../typemapping/GsonJsonSerializer.java | 52 ++++ .../typemapping/JsonSerializer.java | 30 +++ .../typemapping/JsonTypeMapper.java | 56 +++++ .../typemapping/SerializationException.java | 31 +++ .../elasticsearch/typemapping/TypeMapper.java | 36 +++ .../writer/ElasticsearchDataWriterBuilder.java | 83 +++++++ .../writer/ElasticsearchRestWriter.java | 232 ++++++++++++++++++ .../ElasticsearchTransportClientWriter.java | 118 +++++++++ .../writer/ElasticsearchWriterBase.java | 168 +++++++++++++ .../ElasticsearchWriterConfigurationKeys.java | 71 ++++++ .../elasticsearch/writer/ExceptionLogger.java | 26 ++ .../writer/FutureCallbackHolder.java | 193 +++++++++++++++ .../writer/MalformedDocPolicy.java | 26 ++ .../elasticsearch/ElasticsearchTestServer.java | 217 +++++++++++++++++ .../ElasticsearchTestServerTest.java | 50 ++++ .../elasticsearch/writer/ConfigBuilder.java | 72 ++++++ .../ElasticsearchTransportClientWriterTest.java | 54 +++++ .../writer/ElasticsearchWriterBaseTest.java | 113 +++++++++ .../ElasticsearchWriterIntegrationTest.java | 243 +++++++++++++++++++ .../elasticsearch/writer/RestWriterVariant.java | 97 ++++++++ .../elasticsearch/writer/TestClient.java | 37 +++ .../writer/TransportWriterVariant.java | 96 ++++++++ .../elasticsearch/writer/WriterVariant.java | 40 +++ .../gobblin/test/AvroRecordGenerator.java | 104 ++++++++ .../gobblin/test/JsonRecordGenerator.java | 75 ++++++ .../org/apache/gobblin/test/PayloadType.java | 27 +++ .../gobblin/test/RecordTypeGenerator.java | 43 ++++ .../eventhub/writer/EventhubBatchTest.java | 35 +-- .../java/org/apache/gobblin/test/TestUtils.java | 21 ++ gradle/scripts/globalDependencies.gradle | 30 +-- 46 files changed, 3005 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java index 2be89c6..a599753 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java @@ -340,6 +340,8 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite .update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS); } if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1 + log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(), + attempt.getRecord().toString()); attempt.incAttempt(); attempt.setPrevAttemptFailure(throwable); AsyncWriterManager.this.retryQueue.get().add(attempt); @@ -391,6 +393,7 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite Attempt attempt = this.retryQueue.take(); if (attempt != null) { maybeSleep(attempt.getPrevAttemptTimestampNanos()); + log.debug("Retry thread will retry record: {}", attempt.getRecord().toString()); attemptWrite(attempt); } } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java index ff16590..faf815c 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java @@ -127,17 +127,18 @@ public abstract class Batch<D>{ /** * A method to check if the batch has the room to add a new record * @param record: record needs to be added + * @param largeMessagePolicy: the policy that is in effect for large messages * @return Indicates if this batch still have enough space to hold a new record */ - public abstract boolean hasRoom (D record); + public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy); /** * Add a record to this batch * <p> * Implementation of this method should always ensure the record can be added successfully - * The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method + * The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method * is responsible for adding record to internal batch memory and the check for the room space is performed - * by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should + * by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should * already be resolved before this method is invoked. * </p> * @@ -162,14 +163,19 @@ public abstract class Batch<D>{ * * @param record : record needs to be added * @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged + * @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch * @return A future object which contains {@link RecordMetadata} */ - public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) { - if (!hasRoom(record)) { - LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes"); + public Future<RecordMetadata> tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy) + throws RecordTooLargeException { + if (!hasRoom(record, largeMessagePolicy)) { + LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes", + record.toString(), getCurrentSizeInByte()); + if (largeMessagePolicy == LargeMessagePolicy.FAIL) { + throw new RecordTooLargeException(); + } return null; } - this.append(record); thunks.add(new Thunk(callback, getRecordSizeInByte(record))); RecordFuture future = new RecordFuture(latch, recordCount); @@ -178,7 +184,9 @@ public abstract class Batch<D>{ } public void await() throws InterruptedException{ + LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount); this.latch.await(); + LOG.debug("Batch {} done with {} records", this.id, this.recordCount); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java index ceaffec..87039b6 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java @@ -39,7 +39,7 @@ import org.apache.gobblin.annotation.Alpha; * @param <D> data record type */ @Alpha -public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> { +public class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> { private RecordProcessor<D> processor; private BatchAccumulator<D> accumulator; @@ -136,7 +136,7 @@ public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> { return new WriteCallback<Object>() { @Override public void onSuccess(WriteResponse writeResponse) { - LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size()); + LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size()); batch.onSuccess(writeResponse); batch.done(); accumulator.deallocate(batch); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java index ef63882..7b6b4dc 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java @@ -62,7 +62,11 @@ public class BytesBoundedBatch<D> extends Batch<D>{ records.add(record); } - boolean hasRoom (D record) { + boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) { + if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) { + // there is always space for one record, no matter how big :) + return true; + } long recordLen = BytesBoundedBatch.this.getInternalSize(record); return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit; } @@ -80,8 +84,8 @@ public class BytesBoundedBatch<D> extends Batch<D>{ return memory.getRecords(); } - public boolean hasRoom (D object) { - return memory.hasRoom(object); + public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) { + return memory.hasRoom(object, largeMessagePolicy); } public void append (D object) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java new file mode 100644 index 0000000..28ca949 --- /dev/null +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +/** + * Describes how single messages that are larger than a batch message limit should be treated + */ +public enum LargeMessagePolicy { + DROP, // drop (and log) messages that exceed the threshold + ATTEMPT, // attempt to deliver messages that exceed the threshold + FAIL // throw an error when this happens +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java new file mode 100644 index 0000000..845e6a8 --- /dev/null +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +public class RecordTooLargeException extends Exception { +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java index 9b7a608..58b0942 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java @@ -43,14 +43,16 @@ import org.apache.gobblin.util.ConfigUtils; * keeps in the deque until a TTL is expired. */ -public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> { +public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> { + private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL; private Deque<BytesBoundedBatch<D>> dq = new LinkedList<>(); private IncompleteRecordBatches incomplete = new IncompleteRecordBatches(); private final long batchSizeLimit; private final long memSizeLimit; private final double tolerance = 0.95; private final long expireInMilliSecond; + private final LargeMessagePolicy largeMessagePolicy; private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class); private final ReentrantLock dqLock = new ReentrantLock(); @@ -63,24 +65,31 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato } public SequentialBasedBatchAccumulator(Properties properties) { - Config config = ConfigUtils.propertiesToConfig(properties); - this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE, - Batch.BATCH_SIZE_DEFAULT); - - this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL, - Batch.BATCH_TTL_DEFAULT); - - this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY, - Batch.BATCH_QUEUE_CAPACITY_DEFAULT); + this(ConfigUtils.propertiesToConfig(properties)); + } - this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit); + public SequentialBasedBatchAccumulator(Config config) { + this(ConfigUtils.getLong(config, Batch.BATCH_SIZE, + Batch.BATCH_SIZE_DEFAULT), + ConfigUtils.getLong(config, Batch.BATCH_TTL, + Batch.BATCH_TTL_DEFAULT), + ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY, + Batch.BATCH_QUEUE_CAPACITY_DEFAULT)); } public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) { + this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY); + } + + public SequentialBasedBatchAccumulator(long batchSizeLimit, + long expireInMilliSecond, + long capacity, + LargeMessagePolicy largeMessagePolicy) { this.batchSizeLimit = batchSizeLimit; this.expireInMilliSecond = expireInMilliSecond; this.capacity = capacity; this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit); + this.largeMessagePolicy = largeMessagePolicy; } public long getNumOfBatches () { @@ -101,7 +110,12 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato try { BytesBoundedBatch last = dq.peekLast(); if (last != null) { - Future<RecordMetadata> future = last.tryAppend(record, callback); + Future<RecordMetadata> future = null; + try { + future = last.tryAppend(record, callback, this.largeMessagePolicy); + } catch (RecordTooLargeException e) { + // Ok if the record was too large for the current batch + } if (future != null) { return future; } @@ -110,12 +124,18 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato // Create a new batch because previous one has no space BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond); LOG.debug("Batch " + batch.getId() + " is generated"); - Future<RecordMetadata> future = batch.tryAppend(record, callback); + Future<RecordMetadata> future = null; + try { + future = batch.tryAppend(record, callback, this.largeMessagePolicy); + } catch (RecordTooLargeException e) { + // If a new batch also wasn't able to accomodate the new message + throw new RuntimeException("Failed due to a message that was too large", e); + } - // Even single record can exceed the batch size limit - // Ignore the record because Eventhub can only accept payload less than 256KB + // The future might be null, since the largeMessagePolicy might be set to DROP if (future == null) { - LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: " + assert largeMessagePolicy.equals(LargeMessagePolicy.DROP); + LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: " + record); future = Futures.immediateFuture(new RecordMetadata(0)); callback.onSuccess(WriteResponse.EMPTY); @@ -124,6 +144,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato // if queue is full, we should not add more while (dq.size() >= this.capacity) { + LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity); this.notFull.await(); } dq.addLast(batch); @@ -187,7 +208,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato return dq.poll(); } else { while (dq.size() == 0) { - LOG.info ("ready to sleep because of queue is empty"); + LOG.debug ("ready to sleep because of queue is empty"); SequentialBasedBatchAccumulator.this.notEmpty.await(); if (SequentialBasedBatchAccumulator.this.isClosed()) { return dq.poll(); @@ -203,7 +224,7 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato if (dq.size() == 1) { if (dq.peekFirst().isTTLExpire()) { - LOG.info ("Batch " + dq.peekFirst().getId() + " is expired"); + LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired"); BytesBoundedBatch candidate = dq.poll(); SequentialBasedBatchAccumulator.this.notFull.signal(); return candidate; @@ -240,12 +261,16 @@ public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulato public void flush() { try { ArrayList<Batch> batches = this.incomplete.all(); - LOG.info ("flush on {} batches", batches.size()); + int numOutstandingRecords = 0; + for (Batch batch: batches) { + numOutstandingRecords += batch.getRecords().size(); + } + LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords); for (Batch batch: batches) { batch.await(); } } catch (Exception e) { - LOG.info ("Error happens when flushing"); + LOG.error ("Error happened while flushing batches"); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-distribution/gobblin-flavor-standard.gradle ---------------------------------------------------------------------- diff --git a/gobblin-distribution/gobblin-flavor-standard.gradle b/gobblin-distribution/gobblin-flavor-standard.gradle index c2061a5..2f544ca 100644 --- a/gobblin-distribution/gobblin-flavor-standard.gradle +++ b/gobblin-distribution/gobblin-flavor-standard.gradle @@ -21,4 +21,5 @@ dependencies { compile project(':gobblin-modules:gobblin-crypto-provider') compile project(':gobblin-modules:gobblin-kafka-08') compile project(':gobblin-modules:google-ingestion') + compile project(':gobblin-modules:gobblin-elasticsearch') } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-example/src/main/resources/wikipedia-elastic.conf ---------------------------------------------------------------------- diff --git a/gobblin-example/src/main/resources/wikipedia-elastic.conf b/gobblin-example/src/main/resources/wikipedia-elastic.conf new file mode 100644 index 0000000..9db386e --- /dev/null +++ b/gobblin-example/src/main/resources/wikipedia-elastic.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic +# with sampling +job { + name=PullFromWikipediaToElasticSearch + group=Wikipedia + description=Pull from Wikipedia and write to ElasticSearch +} + +task.maxretries=0 + +source { + class=org.apache.gobblin.example.wikipedia.WikipediaSource + page.titles="Wikipedia:Sandbox" + revisions.cnt=5 +} + +wikipedia { + api.rooturl="https://en.wikipedia.org/w/api.php" + avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\", \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\", \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\", \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\", \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}" +} +converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter +extract.namespace=org.apache.gobblin.example.wikipedia + +writer { + builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder + elasticsearch { + client.type=REST + index.name=wikipedia-test + index.type=docs + #hosts=hostname + #ssl { + # enabled=true + # keystoreType=pkcs12 + # keystorePassword=change_me + # keystoreLocation=/path/to/.p12 file + # truststoreType=jks + # truststoreLocation=/path/to/cacerts + # truststorePassword=changeme + #} + typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper + useIdFromData=false # change to true if you want to use a field from the record as the id field + #idFieldName=id # change to the field of the record that you want to use as the id of the document + } +} + +data.publisher.type=org.apache.gobblin.publisher.NoopPublisher + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch-deps/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch-deps/build.gradle b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle new file mode 100644 index 0000000..35e9a36 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +buildscript { + repositories { + jcenter() + } + dependencies { + classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' + } +} + +apply plugin: 'com.github.johnrengelman.shadow' +apply plugin: 'java' + +dependencies { + compile "org.elasticsearch.client:transport:5.6.8" + compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8" + compile "com.google.guava:guava:18.0" +} + + +configurations { + compile { + exclude group: "org.apache.hadoop" + exclude group: "com.sun.jersey.contribs" + } +} + +shadowJar { + zip64 true + relocate 'com.google.common', 'shadow.gobblin.elasticsearch.com.google.common' +} + +ext.classification="library" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/build.gradle b/gobblin-modules/gobblin-elasticsearch/build.gradle new file mode 100644 index 0000000..2d624b2 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/build.gradle @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'java' + +dependencies { + compile project(":gobblin-api") + compile project(":gobblin-core-base") + compile project(":gobblin-utility") + compile project(":gobblin-metrics-libs:gobblin-metrics") + compile project(path: ":gobblin-modules:gobblin-elasticsearch-deps", configuration:"shadow") + + compile "org.apache.logging.log4j:log4j-to-slf4j:2.7" + compile "org.slf4j:slf4j-api:1.7.21" + compile externalDependency.avro + compile externalDependency.jacksonCore + compile externalDependency.jacksonMapper + compile externalDependency.commonsHttpClient + compile externalDependency.commonsPool + compile externalDependency.commonsLang3 + compile externalDependency.slf4j + compile externalDependency.httpclient + compile externalDependency.httpcore + compile externalDependency.lombok + compile externalDependency.metricsCore + compile externalDependency.typesafeConfig + compile externalDependency.findBugsAnnotations + + testCompile project(":gobblin-runtime") + testCompile project(":gobblin-test-utils") + testCompile externalDependency.jsonAssert + testCompile externalDependency.mockito + testCompile externalDependency.testng +} + +task installTestDependencies(type:Exec) { + workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/" + commandLine './scripts/install_test_deps.sh' +} + +task uninstallTestDependencies(type: Exec) { + workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/" + commandLine './scripts/uninstall_test_deps.sh' + +} + +test.dependsOn installTestDependencies +test.finalizedBy uninstallTestDependencies + +configurations { + compile { + transitive = false + } +} + +test { + workingDir rootProject.rootDir + maxParallelForks = 4 +} + + +ext.classification="library" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh new file mode 100755 index 0000000..48324da --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +TARGET_DIR="test-elasticsearch" +ES_VERSION=5.6.8 +ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION} +echo ${TARGET_DIR} +mkdir -p ${TARGET_DIR} + + +ES_TAR=${TARGET_DIR}/elasticsearch-${ES_VERSION}.tar.gz +ES_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz +echo ${ES_URL} +echo ${ES_TAR} +if [ -d $ES_DIR ]; +then + echo "Skipping download since version already found at ${ES_DIR}" + echo "Cleaning up directory" + rm -rf ${TARGET_DIR}/elasticsearch-${ES_VERSION} +else + echo "$ES_DIR does not exist, downloading" + curl -o ${ES_TAR} ${ES_URL} +fi +tar -xzf ${ES_TAR} -C ${TARGET_DIR} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh new file mode 100755 index 0000000..db79f86 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +TARGET_DIR="test-elasticsearch" +ES_VERSION=5.6.8 +ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION} +rm -rf ${TARGET_DIR} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java new file mode 100644 index 0000000..5242202 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.output.ByteArrayOutputStream; + +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link JsonSerializer} for {@link GenericRecord} objects. + */ +@Slf4j +public class AvroGenericRecordSerializer implements JsonSerializer<GenericRecord> { + + private final ByteArrayOutputStream byteArrayOutputStream; + private final DataOutputStream out; + private final GenericDatumWriter<GenericRecord> writer; + private final Closer closer; + + + public AvroGenericRecordSerializer() { + this.closer =Closer.create(); + this.byteArrayOutputStream = new ByteArrayOutputStream(); + this.out = this.closer.register(new DataOutputStream(this.byteArrayOutputStream)); + this.writer = new GenericDatumWriter<GenericRecord>(); + } + + @Override + public void configure(Config config) { + + } + + @Override + public synchronized byte[] serializeToJson(GenericRecord serializable) + throws SerializationException { + try { + /** + * We use the toString method of Avro to flatten the JSON for optional nullable types. + * Otherwise the JSON has an additional level of nesting to encode the type. + * e.g. "id": {"string": "id-value"} versus "id": "id-value" + * See {@link: https://issues.apache.org/jira/browse/AVRO-1582} for a good discussion on this. + */ + String serialized = serializable.toString(); + return serialized.getBytes(Charset.forName("UTF-8")); + + } catch (Exception exception) { + throw new SerializationException("Could not serializeToJson Avro record", exception); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java new file mode 100644 index 0000000..0586f3c --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.IOException; + +import org.apache.avro.generic.GenericRecord; + +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A TypeMapper for Avro GenericRecords. + */ +@Slf4j +public class AvroGenericRecordTypeMapper implements TypeMapper<GenericRecord> { + + private final JsonSerializer<GenericRecord> serializer; + private final Closer closer; + + public AvroGenericRecordTypeMapper() { + this.closer =Closer.create(); + this.serializer = this.closer.register(new AvroGenericRecordSerializer()); + } + + @Override + public void configure(Config config) { + this.serializer.configure(config); + log.info("AvroGenericRecordTypeMapper successfully configured"); + } + + @Override + public JsonSerializer<GenericRecord> getSerializer() { + return this.serializer; + } + + @Override + public String getValue(String fieldName, GenericRecord record) + throws FieldMappingException { + try { + Object idValue = record.get(fieldName); + return idValue.toString(); + } + catch (Exception e) { + throw new FieldMappingException("Could not find field " + fieldName, e); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java new file mode 100644 index 0000000..781f918 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +/** + * An exception for type mapping errors during field-based access + */ +public class FieldMappingException extends Exception { + + public FieldMappingException(Exception e) { + super(e); + } + + public FieldMappingException(String message, Exception e) { + super(message, e); + } + + public FieldMappingException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java new file mode 100644 index 0000000..d44986c --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import com.google.gson.Gson; +import com.typesafe.config.Config; + + +/** + * A Gson based Json Serializer + */ +public class GsonJsonSerializer implements JsonSerializer<Object> { + private final Gson _gson = new Gson(); + + @Override + public void configure(Config config) { + + } + + @Override + public byte[] serializeToJson(Object serializable) + throws SerializationException { + String jsonString = _gson.toJson(serializable); + try { + return jsonString.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new SerializationException(e); + } + } + + @Override + public void close() + throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java new file mode 100644 index 0000000..41f2885 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.Closeable; + +import com.typesafe.config.Config; + + +public interface JsonSerializer<T> extends Closeable { + + void configure(Config config); + + byte[] serializeToJson(T serializable) throws SerializationException; + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java new file mode 100644 index 0000000..8491147 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.IOException; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.typesafe.config.Config; + + +public class JsonTypeMapper implements TypeMapper<JsonElement> { + + private final JsonSerializer serializer = new GsonJsonSerializer(); + @Override + public void configure(Config config) { + + } + + @Override + public JsonSerializer<JsonElement> getSerializer() { + return serializer; + } + + @Override + public String getValue(String fieldName, JsonElement record) + throws FieldMappingException { + assert record.isJsonObject(); + JsonObject jsonObject = record.getAsJsonObject(); + if (jsonObject.has(fieldName)) { + return jsonObject.get(fieldName).getAsString(); + } else { + throw new FieldMappingException("Could not find field :" + fieldName); + } + } + + @Override + public void close() + throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java new file mode 100644 index 0000000..d2edb53 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +/** + * A class to hold exceptions thrown by {@link JsonSerializer}s. + */ +public class SerializationException extends Exception { + public SerializationException(Exception e) { + super(e); + } + + public SerializationException(String s, Exception exception) { + super(s, exception); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java new file mode 100644 index 0000000..5aa909b --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.Closeable; + +import com.typesafe.config.Config; + + +/** + * An interface that enables the ElasticSearch writer to work with different types of records. + * Supports serialization and id-getter capabilities + */ +public interface TypeMapper<T> extends Closeable { + + void configure(Config config); + + JsonSerializer<T> getSerializer(); + + String getValue(String fieldName, T record) throws FieldMappingException; + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java new file mode 100644 index 0000000..cb6ed15 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.AsyncWriterManager; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.BufferedAsyncDataWriter; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.DataWriterBuilder; +import org.apache.gobblin.writer.SequentialBasedBatchAccumulator; + +import com.google.gson.JsonObject; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.State; + +public class ElasticsearchDataWriterBuilder extends DataWriterBuilder { + + @Override + public DataWriter build() throws IOException { + + State state = this.destination.getProperties(); + Properties taskProps = state.getProperties(); + Config config = ConfigUtils.propertiesToConfig(taskProps); + + SequentialBasedBatchAccumulator<JsonObject> batchAccumulator = new SequentialBasedBatchAccumulator<>(taskProps); + + BatchAsyncDataWriter asyncDataWriter; + switch (ElasticsearchWriterConfigurationKeys.ClientType.valueOf( + ConfigUtils.getString(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT).toUpperCase())) { + case REST: { + asyncDataWriter = new ElasticsearchRestWriter(config); + break; + } + case TRANSPORT: { + asyncDataWriter = new ElasticsearchTransportClientWriter(config); + break; + } + default: { + throw new IllegalArgumentException("Need to specify which " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE + + " client to use (rest/transport)"); + } + } + BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, asyncDataWriter); + + double failureAllowance = ConfigUtils.getDouble(config, ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG, + ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0; + boolean retriesEnabled = ConfigUtils.getBoolean(config, ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED, + ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED_DEFAULT); + int maxRetries = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.MAX_RETRIES, + ElasticsearchWriterConfigurationKeys.MAX_RETRIES_DEFAULT); + + + return AsyncWriterManager.builder() + .failureAllowanceRatio(failureAllowance) + .retriesEnabled(retriesEnabled) + .numRetries(maxRetries) + .config(config) + .asyncDataWriter(bufferedAsyncDataWriter) + .build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java new file mode 100644 index 0000000..7cd77da --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.elasticsearch.writer; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.password.PasswordManager; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.Batch; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.GenericWriteResponse; +import org.apache.gobblin.writer.WriteCallback; +import org.apache.gobblin.writer.WriteResponse; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentType; + +import com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; + +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchRestWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> { + + private final RestHighLevelClient client; + private final RestClient lowLevelClient; + + ElasticsearchRestWriter(Config config) + throws IOException { + super(config); + + + int threadCount = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT); + try { + + PasswordManager passwordManager = PasswordManager.getInstance(); + Boolean sslEnabled = ConfigUtils.getBoolean(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT); + if (sslEnabled) { + + // keystore + String keyStoreType = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT); + String keyStoreFilePassword = passwordManager.readPassword(ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, "")); + String identityFilepath = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, ""); + + // truststore + String trustStoreType = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT); + String trustStoreFilePassword = passwordManager.readPassword(ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, "")); + String cacertsFilepath = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, ""); + String truststoreAbsolutePath = Paths.get(cacertsFilepath).toAbsolutePath().normalize().toString(); + log.info("Truststore absolutePath is:" + truststoreAbsolutePath); + + + this.lowLevelClient = + buildRestClient(this.hostAddresses, threadCount, true, keyStoreType, keyStoreFilePassword, identityFilepath, + trustStoreType, trustStoreFilePassword, cacertsFilepath); + } + else { + this.lowLevelClient = buildRestClient(this.hostAddresses, threadCount); + } + client = new RestHighLevelClient(this.lowLevelClient); + + log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, " + + "indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}", + this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper.getClass().getCanonicalName(), + sslEnabled); + + } catch (Exception e) { + throw new IOException("Failed to instantiate rest elasticsearch client", e); + } + } + + @Override + int getDefaultPort() { + return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_REST_WRITER_DEFAULT_PORT; + } + + + private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount) + throws Exception { + return buildRestClient(hosts, threadCount, false, null, null, null, null, null, null); + } + + + //TODO: Support pass through of configuration (e.g. timeouts etc) of rest client from above + private static RestClient buildRestClient(List<InetSocketTransportAddress> hosts, int threadCount, boolean sslEnabled, + String keyStoreType, String keyStoreFilePassword, String identityFilepath, String trustStoreType, + String trustStoreFilePassword, String cacertsFilepath) throws Exception { + + + HttpHost[] httpHosts = new HttpHost[hosts.size()]; + String scheme = sslEnabled?"https":"http"; + for (int h = 0; h < httpHosts.length; h++) { + InetSocketTransportAddress host = hosts.get(h); + httpHosts[h] = new HttpHost(host.getAddress(), host.getPort(), scheme); + } + + RestClientBuilder builder = RestClient.builder(httpHosts); + + if (sslEnabled) { + log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", trustStoreType, cacertsFilepath); + KeyStore truststore = KeyStore.getInstance(trustStoreType); + FileInputStream trustInputStream = new FileInputStream(cacertsFilepath); + try { + truststore.load(trustInputStream, trustStoreFilePassword.toCharArray()); + } + finally { + trustInputStream.close(); + } + SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null); + + log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", keyStoreType, identityFilepath); + + KeyStore keystore = KeyStore.getInstance(keyStoreType); + FileInputStream keyInputStream = new FileInputStream(identityFilepath); + try { + keystore.load(keyInputStream, keyStoreFilePassword.toCharArray()); + } + finally { + keyInputStream.close(); + } + sslBuilder.loadKeyMaterial(keystore, keyStoreFilePassword.toCharArray()); + + final SSLContext sslContext = sslBuilder.build(); + builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder + // Set ssl context + .setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier()) + // Configure number of threads for clients + .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build())); + } else { + builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder + // Configure number of threads for clients + .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build())); + } + + // Configure timeouts + builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder + .setConnectionRequestTimeout(0)); // Important, otherwise the client has spurious timeouts + + return builder.build(); + } + + @Override + public Future<WriteResponse> write(final Batch<Object> batch, @Nullable WriteCallback callback) { + + Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback); + try { + client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener()); + return preparedBatch.getSecond().getFuture(); + } + catch (Exception e) { + throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e); + } + } + + + + @Override + public void flush() throws IOException { + + } + + @Override + public void close() throws IOException { + super.close(); + this.lowLevelClient.close(); + } + + @VisibleForTesting + public RestHighLevelClient getRestHighLevelClient() { + return this.client; + } + + @VisibleForTesting + public RestClient getRestLowLevelClient() { + return this.lowLevelClient; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java new file mode 100644 index 0000000..bb26fb5 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.Batch; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.GenericWriteResponseWrapper; +import org.apache.gobblin.writer.WriteCallback; +import org.apache.gobblin.writer.WriteResponse; +import org.apache.gobblin.writer.WriteResponseFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +class ElasticsearchTransportClientWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter<Object> { + + private final TransportClient client; + + ElasticsearchTransportClientWriter(Config config) throws UnknownHostException { + super(config); + // Check if ssl is being configured, throw error that transport client does not support ssl + Preconditions.checkArgument(!ConfigUtils.getBoolean(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, false), + "Transport client does not support ssl, try the Rest client instead"); + + this.client = createTransportClient(config); + + log.info("ElasticsearchWriter configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}", + this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper); + } + + @Override + int getDefaultPort() { + return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT; + } + + @Override + public Future<WriteResponse> write(Batch<Object> batch, @Nullable WriteCallback callback) { + + Pair<BulkRequest, FutureCallbackHolder> preparedBatch = this.prepareBatch(batch, callback); + client.bulk(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener()); + return preparedBatch.getSecond().getFuture(); + + } + + @Override + public void flush() throws IOException { + // Elasticsearch client doesn't support a flush method + } + + @Override + public void close() throws IOException { + log.info("Got a close call in ElasticSearchTransportWriter"); + super.close(); + this.client.close(); + } + + @VisibleForTesting + TransportClient getTransportClient() { + return this.client; + } + + private TransportClient createTransportClient(Config config) throws UnknownHostException { + TransportClient transportClient; + + // Set TransportClient settings + Settings.Builder settingsBuilder = Settings.builder(); + if (config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)) { + settingsBuilder.put(ConfigUtils.configToProperties(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)); + } + settingsBuilder.put("client.transport.ignore_cluster_name",true); + settingsBuilder.put("client.transport.sniff", true); + transportClient = new PreBuiltTransportClient(settingsBuilder.build()); + this.hostAddresses.forEach(transportClient::addTransportAddress); + return transportClient; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java new file mode 100644 index 0000000..5238b50 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.elasticsearch.typemapping.JsonSerializer; +import org.apache.gobblin.elasticsearch.typemapping.TypeMapper; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.Batch; +import org.apache.gobblin.writer.WriteCallback; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentType; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +/** + * A base class for different types of Elasticsearch writers + */ +@Slf4j +public abstract class ElasticsearchWriterBase implements Closeable { + protected final String indexName; + protected final String indexType; + protected final TypeMapper typeMapper; + protected final JsonSerializer serializer; + protected final boolean idMappingEnabled; + protected final String idFieldName; + List<InetSocketTransportAddress> hostAddresses; + protected final MalformedDocPolicy malformedDocPolicy; + + ElasticsearchWriterBase(Config config) + throws UnknownHostException { + + this.indexName = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME); + Preconditions.checkNotNull(this.indexName, "Index Name not provided. Please set " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME); + Preconditions.checkArgument(this.indexName.equals(this.indexName.toLowerCase()), + "Index name must be lowercase, you provided " + this.indexName); + this.indexType = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE); + Preconditions.checkNotNull(this.indexName, "Index Type not provided. Please set " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE); + this.idMappingEnabled = ConfigUtils.getBoolean(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT); + this.idFieldName = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT); + String typeMapperClassName = ConfigUtils.getString(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT); + if (typeMapperClassName.isEmpty()) { + throw new IllegalArgumentException(this.getClass().getCanonicalName() + " needs to be configured with " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS + " to enable type mapping"); + } + try { + Class<?> typeMapperClass = (Class<?>) Class.forName(typeMapperClassName); + + this.typeMapper = (TypeMapper) ConstructorUtils.invokeConstructor(typeMapperClass); + this.typeMapper.configure(config); + this.serializer = this.typeMapper.getSerializer(); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + log.error("Failed to instantiate type-mapper from class " + typeMapperClassName, e); + throw Throwables.propagate(e); + } + + this.malformedDocPolicy = MalformedDocPolicy.valueOf(ConfigUtils.getString(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT)); + + // If list is empty, connect to the default host and port + if (!config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS)) { + InetSocketTransportAddress hostAddress = new InetSocketTransportAddress( + InetAddress.getByName(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_DEFAULT_HOST), + getDefaultPort()); + this.hostAddresses = new ArrayList<>(1); + this.hostAddresses.add(hostAddress); + log.info("Adding host {} to Elasticsearch writer", hostAddress); + } else { + // Get list of hosts + List<String> hosts = ConfigUtils.getStringList(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS); + // Add host addresses + Splitter hostSplitter = Splitter.on(":").trimResults(); + this.hostAddresses = new ArrayList<>(hosts.size()); + for (String host : hosts) { + + List<String> hostSplit = hostSplitter.splitToList(host); + Preconditions.checkArgument(hostSplit.size() == 1 || hostSplit.size() == 2, + "Malformed host name for Elasticsearch writer: " + host + " host names must be of form [host] or [host]:[port]"); + + InetAddress hostInetAddress = InetAddress.getByName(hostSplit.get(0)); + InetSocketTransportAddress hostAddress = null; + + if (hostSplit.size() == 1) { + hostAddress = new InetSocketTransportAddress(hostInetAddress, this.getDefaultPort()); + } else if (hostSplit.size() == 2) { + hostAddress = new InetSocketTransportAddress(hostInetAddress, Integer.parseInt(hostSplit.get(1))); + } + this.hostAddresses.add(hostAddress); + log.info("Adding host {} to Elasticsearch writer", hostAddress); + } + } + } + + abstract int getDefaultPort(); + + + protected Pair<BulkRequest, FutureCallbackHolder> prepareBatch(Batch<Object> batch, WriteCallback callback) { + BulkRequest bulkRequest = new BulkRequest(); + final StringBuilder stringBuilder = new StringBuilder(); + for (Object record : batch.getRecords()) { + try { + byte[] serializedBytes = this.serializer.serializeToJson(record); + log.debug("serialized record: {}", serializedBytes); + IndexRequest indexRequest = new IndexRequest(this.indexName, this.indexType) + .source(serializedBytes, 0, serializedBytes.length, XContentType.JSON); + if (this.idMappingEnabled) { + String id = this.typeMapper.getValue(this.idFieldName, record); + indexRequest.id(id); + stringBuilder.append(";").append(id); + } + bulkRequest.add(indexRequest); + } + catch (Exception e) { + log.error("Encountered exception {}", e); + } + } + FutureCallbackHolder futureCallbackHolder = new FutureCallbackHolder(callback, + exception -> log.error("Batch: {} failed on ids; {} with exception {}", batch.getId(), + stringBuilder.toString(), exception), + this.malformedDocPolicy); + return new Pair(bulkRequest, futureCallbackHolder); + } + + @Override + public void close() throws IOException { + this.serializer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f1bc746c/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java new file mode 100644 index 0000000..0dad29d --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper; + + +public class ElasticsearchWriterConfigurationKeys { + + private static final String ELASTICSEARCH_WRITER_PREFIX = "writer.elasticsearch"; + + private static String prefix(String value) { return ELASTICSEARCH_WRITER_PREFIX + "." + value;}; + + public static final String ELASTICSEARCH_WRITER_SETTINGS = prefix("settings"); + public static final String ELASTICSEARCH_WRITER_HOSTS = prefix("hosts"); + public static final String ELASTICSEARCH_WRITER_INDEX_NAME = prefix("index.name"); + public static final String ELASTICSEARCH_WRITER_INDEX_TYPE = prefix("index.type"); + public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS = prefix("typeMapperClass"); + public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT = JsonTypeMapper.class.getCanonicalName(); + public static final String ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED = prefix("useIdFromData"); + public static final Boolean ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT = false; + public static final String ELASTICSEARCH_WRITER_ID_FIELD = prefix("idFieldName"); + public static final String ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT = "id"; + public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE = prefix("client.type"); + public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT = "REST"; + public static final String ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE = prefix("client.threadPoolSize"); + public static final int ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT = 5; + public static final String ELASTICSEARCH_WRITER_SSL_ENABLED=prefix("ssl.enabled"); + public static final boolean ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT=false; + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE=prefix("ssl.keystoreType"); + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT = "pkcs12"; + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD=prefix("ssl.keystorePassword"); + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION=prefix("ssl.keystoreLocation"); + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE=prefix("ssl.truststoreType"); + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT = "jks"; + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION=prefix("ssl.truststoreLocation"); + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD=prefix("ssl.truststorePassword"); + public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY = prefix("malformedDocPolicy"); + public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT = "FAIL"; + + //Async Writer Configuration + public static final String RETRIES_ENABLED = prefix("retriesEnabled"); + public static final boolean RETRIES_ENABLED_DEFAULT = true; + public static final String MAX_RETRIES = prefix("maxRetries"); + public static final int MAX_RETRIES_DEFAULT = 5; + static final String FAILURE_ALLOWANCE_PCT_CONFIG = prefix("failureAllowancePercentage"); + static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 0.0; + + public enum ClientType { + TRANSPORT, + REST + } + + public static final String ELASTICSEARCH_WRITER_DEFAULT_HOST = "localhost"; + public static final int ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT = 9300; + public static final int ELASTICSEARCH_REST_WRITER_DEFAULT_PORT = 9200; +}
