This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fb289fd [GOBBLIN-880] Bump CouchbaseWriter Couchbase SDK version +
write docs + cert based auth + enable TTL + dnsSrv[]
fb289fd is described below
commit fb289fdba46c9db65ec650a2f42048e031d4416e
Author: Michael Menarguez <[email protected]>
AuthorDate: Mon Sep 16 20:25:09 2019 -0700
[GOBBLIN-880] Bump CouchbaseWriter Couchbase SDK version + write docs +
cert based auth + enable TTL + dnsSrv[]
JIRA ticket:
https://issues.apache.org/jira/browse/GOBBLIN-880
RB Changes:
1 - Added logic to connect using certificate based
auth to the buckets (Will need to bump
com.couchbase.client:java-client to a newer
version like 2.7.6) and associated configs
2 - TTL implementation
* Added configs to allow setting a TTL
(documentTTL) and also specify the timeunits
(documentTTLUnits) of these settings
* Added logic to specify the path to key to the
field containing the source timestamp
(documentTTLOriginField) and its units
(documentTTLOriginUnits) to disambiguate between
UNIX (sec) timestamps and other formats like
timestamps in milliseconds.
3 - Added missing dnsSrv config
4 - Written proper documentation on gobblin-
docs/writers/CouchbaseWriter.md
5 - Brought in CouchbaseMock from Gradle and adapt
existing unit tests.
6 - Added getTimeUnit to ConfigUtils + Unit tests
Closes #2734 from menarguez/gobblin-880
---
gobblin-docs/writers/CouchbaseWriter.md | 147 ++++++++++++++
gobblin-modules/gobblin-couchbase/build.gradle | 3 +-
.../writer/CouchbaseEnvironmentFactory.java | 13 +-
.../gobblin/couchbase/writer/CouchbaseWriter.java | 204 +++++++++++++------
.../couchbase/writer/CouchbaseWriterBuilder.java | 36 ++--
.../writer/CouchbaseWriterConfigurationKeys.java | 9 +
.../gobblin/couchbase/CouchbaseTestServer.java | 25 +--
.../couchbase/writer/CouchbaseWriterTest.java | 222 ++++++++++++++++++++-
.../java/org/apache/gobblin/util/ConfigUtils.java | 28 +++
.../org/apache/gobblin/util/ConfigUtilsTest.java | 32 +++
mkdocs.yml | 2 +
11 files changed, 606 insertions(+), 115 deletions(-)
diff --git a/gobblin-docs/writers/CouchbaseWriter.md
b/gobblin-docs/writers/CouchbaseWriter.md
new file mode 100644
index 0000000..2b520e5
--- /dev/null
+++ b/gobblin-docs/writers/CouchbaseWriter.md
@@ -0,0 +1,147 @@
+[TOC]
+
+# Introduction
+A CouchbaseWriter is both an AsyncDataWriter a SyncDataWriter that pushes
Documents to a couchbase bucket though the [JAVA
SDK](https://docs.couchbase.com/java-sdk/current/start-using-sdk.html). Note
that CouchbaseWiter only supports writing to a single bucket as there should be
only 1 CouchbaseEnvironment per JVM.
+
+
+# Record format
+Couchbase writer currently support `AVRO` and `JSON` as data inputs. On both
of them it requires the following structured schema:
+
+
+| Document field | Description |
+| -------------- | ----------- |
+| `key` | Unique key used to store the document on the bucket. For more info
view [Couchbase
docs](https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/keys-values.html)
|
+| `data.data` | Object or value containing the information associated with the
`key` for this document |
+| `data.flags` | [Couchbase
flags](https://docs.couchbase.com/server/4.1/developer-guide/transcoders.html)
To store JSON on `data.data` use `0x02 << 24` for UTF-8 `0x04 << 24` . |
+
+The following is a sample input record with JSON data
+
+```json
+{
+ "key": "myKey123",
+ "data": {
+ "data": {
+ "field1": "field1Value",
+ "field2": 123
+ },
+ "flags": 33554432
+ }
+}
+```
+
+or to store plain text:
+
+```json
+{
+ "key": "myKey123",
+ "data": {
+ "data": "singleValueData",
+ "flags": 67108864
+ }
+}
+```
+
+If using AVRO, use the following schema:
+
+```json
+{
+ "type" : "record",
+ "name" : "topLevelRecord",
+ "fields" : [ {
+ "name" : "key",
+ "type" : "string"
+ }, {
+ "name" : "data",
+ "type" : {
+ "type" : "record",
+ "name" : "data",
+ "namespace" : "topLevelRecord",
+ "fields" : [ {
+ "name" : "data",
+ "type" : [ "bytes", "null" ]
+ }, {
+ "name" : "flags",
+ "type" : "int"
+ } ]
+ }
+ } ]
+}
+```
+Note that the key can be other than string if needed.
+# Configuration
+## General configuration values
+| Configuration Key | Default Value | Description |
+| ----------------- | ------------- | ----------- |
+| `writer.couchbase.bucket` | Optional | Name of the couchbase bucket. Change
if using other than default bucket |
+| `writer.couchbase.default` | `"default"` | Name of the default bucket if
`writer.couchbase.bucket` is not provided |
+| `writer.couchbase.dnsSrvEnabled` | `"false"` | Enable DNS SRV bootstrapping
[docs](https://docs.couchbase.com/java-sdk/current/managing-connections.html) |
+| `writer.couchbase.bootstrapServers | `localhost` | URL to bootstrap servers.
If using DNS SRV set `writer.couchbase.dnsSrvEnabled` to true |
+| `writer.couchbase.sslEnabled` | `false` | Use SSL to connect to couchbase |
+| `writer.couchbase.password` | Optional | Bucket password. Will be ignored if
`writer.couchbase.certAuthEnabled` is true |
+| `writer.couchbase.certAuthEnabled` | `false` | Set to true if using
certificate authentication. Must also specify
`writer.couchbase.sslKeystoreFile`, `writer.couchbase.sslKeystorePassword`,
`writer.couchbase.sslTruststoreFile`, and
`writer.couchbase.sslTruststorePassword` |
+| `writer.couchbase.sslKeystoreFile` | Optional | Path to the keystore file
location |
+| `writer.couchbase.sslKeystorePassword` | Optional | Keystore password |
+| `writer.couchbase.sslTruststoreFile` | Optional | Path to the trustStore
file location |
+| `writer.couchbase.sslTruststorePassword` | Optional | TrustStore password |
+| `writer.couchbase.documentTTL` | `0` | Time To Live of each document. Units
are specified in `writer.couchbase.documentTTLOriginField` |
+| `writer.couchbase.documentTTLUnits` | `SECONDS` | Unit for
`writer.couchbase.documentTTL`. Must be one of
[java.util.concurrent.TimeUnit](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html).
Case insensitive |
+| `writer.couchbase.documentTTLOriginField` | Optional | Time To Live of each
document. Units are specified in `writer.couchbase.documentTTLOriginField` |
+| `writer.couchbase.documentTTLOriginUnits` | `MILLISECONDS` | Unit for
`writer.couchbase.documentTTL`. Must be one of
[java.util.concurrent.TimeUnit](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html).
Case insensitive. As an example a `writer.couchbase.documentTTLOriginField`
value of `1568240399000` and `writer.couchbase.documentTTLOriginUnits` value of
`MILLISECONDS` timeunit would be `Wed Sep 11 15:19:59 PDT 2019` |
+| `writer.couchbase.retriesEnabled` | `false` | Enable write retries on
failures |
+| `writer.couchbase.maxRetries` | `5` | Maximum number of retries |
+| `writer.couchbase.failureAllowancePercentage` | `0.0` | The percentage of
failures that you are willing to tolerate while writing to Couchbase. Gobblin
will mark the workunit successful and move on if there are failures but not
enough to trip the failure threshold. Only successfully acknowledged writes are
counted as successful, all others are considered as failures. The default for
the failureAllowancePercentage is set to 0.0. For example, if the value is set
to 0.2 This means that as [...]
+|`operationTimeoutMillis` | `10000` | Global timeout for couchbase
communication operations |
+
+## Authentication
+### No credentials
+NOT RECOMMENDED FOR PRODUCTION.
+
+Do not set `writer.couchbase.certAuthEnabled` nor `writer.couchbase.password`
+### Using certificates
+Set `writer.couchbase.certAuthEnabled` to `true` and values for
`writer.couchbase.sslKeystoreFile`, `writer.couchbase.sslKeystorePassword`,
`writer.couchbase.sslTruststoreFile`, and
`writer.couchbase.sslTruststorePassword`.
+
+`writer.couchbase.password` setting will be ignored if
`writer.couchbase.certAuthEnabled` is set
+### Using bucket password
+Set `writer.couchbase.password`
+
+## Document level expiration
+Couchbase writer allows to set expiration at the document level using the
[expiry](https://docs.couchbase.com/java-sdk/current/document-operations.html)
property of the couchbase document. PLease note that current couchbase
implementation using timestamps limits it to January 19, 2038 03:14:07 GM given
the type of expiry is set to int. CouchbaseWriter only works with global
timestamps and does not use relative expiration in seconds (<30 days) for
simplicity.
+Currently three modes are supported:
+### 1 - Expiration from write time
+Define only `writer.couchbase.documentTTL` and
`writer.couchbase.documentTTLUnits`. For example for a 2 days expiration
configs would look like:
+
+| Configuration Key | Value |
+| ----------------- | ------------- |
+| `writer.couchbase.documentTTL` | `2` |
+| `writer.couchbase.documentTTLUnits` | `DAYS` |
+
+### 2 - Expiration from an origin timestamp
+Define only `writer.couchbase.documentTTL` and
`writer.couchbase.documentTTLUnits`.
+
+For example for a 2 days expiration configs using the `header.time` field that
has timestamp in MILLISECONDS would look like:
+
+| Configuration Key | Value |
+| ----------------- | ------------- |
+| `writer.couchbase.documentTTL` | `2` |
+| `writer.couchbase.documentTTLUnits` | `"DAYS"` |
+| `writer.couchbase.documentTTLOriginField` | `"header.time"` |
+| `writer.couchbase.documentTTLOriginUnits` | `1568240399000` |
+
+So a sample document with origin on 1568240399 (Wed Sep 11 15:19:59 PDT 2019)
would expire on 1568413199 (Fri Sep 13 15:19:59 PDT 2019). The following is a
sample record format.
+
+```json
+{
+ "key": "sampleKey",
+ "data": {
+ "data": {
+ "field1": "field1Value",
+ "header": {
+ "time": 1568240399000
+ }
+ },
+ "flags": 33554432
+ }
+}
+```
+
+}
\ No newline at end of file
diff --git a/gobblin-modules/gobblin-couchbase/build.gradle
b/gobblin-modules/gobblin-couchbase/build.gradle
index 310e4b6..939881e 100644
--- a/gobblin-modules/gobblin-couchbase/build.gradle
+++ b/gobblin-modules/gobblin-couchbase/build.gradle
@@ -23,7 +23,7 @@ dependencies {
compile project(":gobblin-utility")
compile project(":gobblin-metrics-libs:gobblin-metrics")
- compile "com.couchbase.client:java-client:2.5.4"
+ compile "com.couchbase.client:java-client:2.7.6"
compile externalDependency.avro
compile externalDependency.jacksonCore
compile externalDependency.jacksonMapper
@@ -44,6 +44,7 @@ dependencies {
testCompile externalDependency.jsonAssert
testCompile externalDependency.mockito
testCompile externalDependency.testng
+ testCompile "com.couchbase.mock:CouchbaseMock:1.5.23"
}
diff --git
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseEnvironmentFactory.java
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseEnvironmentFactory.java
index 084c172..66972e5 100644
---
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseEnvironmentFactory.java
+++
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseEnvironmentFactory.java
@@ -45,23 +45,22 @@ public class CouchbaseEnvironmentFactory {
String sslTruststoreFile = ConfigUtils.getString(config,
CouchbaseWriterConfigurationKeys.SSL_TRUSTSTORE_FILE, "");
String sslTruststorePassword = ConfigUtils.getString(config,
CouchbaseWriterConfigurationKeys.SSL_TRUSTSTORE_PASSWORD, "");
Boolean certAuthEnabled = ConfigUtils.getBoolean(config,
CouchbaseWriterConfigurationKeys.CERT_AUTH_ENABLED, false);
+ Boolean dnsSrvEnabled = ConfigUtils.getBoolean(config,
CouchbaseWriterConfigurationKeys.DNS_SRV_ENABLED, false);
- DefaultCouchbaseEnvironment.Builder builder = DefaultCouchbaseEnvironment
- .builder()
+
+ DefaultCouchbaseEnvironment.Builder builder =
DefaultCouchbaseEnvironment.builder()
.sslEnabled(sslEnabled)
.sslKeystoreFile(sslKeystoreFile)
.sslKeystorePassword(sslKeystorePassword)
.sslTruststoreFile(sslTruststoreFile)
.sslTruststorePassword(sslTruststorePassword)
- .certAuthEnabled(certAuthEnabled);
+ .certAuthEnabled(certAuthEnabled)
+ .dnsSrvEnabled(dnsSrvEnabled);
if (couchbaseEnvironment == null)
{
couchbaseEnvironment = builder.build();
- return couchbaseEnvironment;
- }
- else {
- return couchbaseEnvironment;
}
+ return couchbaseEnvironment;
}
}
diff --git
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriter.java
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriter.java
index 46c6821..c2bd550 100644
---
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriter.java
+++
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriter.java
@@ -17,19 +17,6 @@
package org.apache.gobblin.couchbase.writer;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.math3.util.Pair;
-
import com.couchbase.client.core.lang.Tuple;
import com.couchbase.client.core.lang.Tuple2;
import com.couchbase.client.core.message.ResponseStatus;
@@ -38,20 +25,33 @@ import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
+import com.couchbase.client.java.auth.CertAuthenticator;
import com.couchbase.client.java.document.AbstractDocument;
-import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.transcoder.Transcoder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import com.typesafe.config.Config;
-
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import rx.Observable;
-import rx.Subscriber;
-
+import org.apache.commons.math3.util.Pair;
import org.apache.gobblin.couchbase.common.TupleDocument;
+import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.GenericWriteResponse;
@@ -61,6 +61,8 @@ import org.apache.gobblin.writer.WriteCallback;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.gobblin.writer.WriteResponseFuture;
import org.apache.gobblin.writer.WriteResponseMapper;
+import rx.Observable;
+import rx.Subscriber;
/**
@@ -72,6 +74,11 @@ public class CouchbaseWriter<D extends AbstractDocument>
implements AsyncDataWri
private final Cluster _cluster;
private final Bucket _bucket;
private final long _operationTimeout;
+ private final int _documentTTL;
+ private final TimeUnit _documentTTLTimeUnits;
+ private final String _documentTTLOriginField;
+ private final TimeUnit _documentTTLOriginUnits;
+
private final TimeUnit _operationTimeunit;
private final WriteResponseMapper<D> _defaultWriteResponseMapper;
@@ -109,29 +116,41 @@ public class CouchbaseWriter<D extends AbstractDocument>
implements AsyncDataWri
public CouchbaseWriter(CouchbaseEnvironment couchbaseEnvironment, Config
config) {
List<String> hosts = ConfigUtils.getStringList(config,
CouchbaseWriterConfigurationKeys.BOOTSTRAP_SERVERS);
+ boolean usesCertAuth = ConfigUtils.getBoolean(config,
CouchbaseWriterConfigurationKeys.CERT_AUTH_ENABLED, false);
String password = ConfigUtils.getString(config,
CouchbaseWriterConfigurationKeys.PASSWORD, "");
+ log.info("Using hosts hosts: {}",
hosts.stream().collect(Collectors.joining(",")));
+
+ _documentTTL = ConfigUtils.getInt(config,
CouchbaseWriterConfigurationKeys.DOCUMENT_TTL, 0);
+ _documentTTLTimeUnits =
+ ConfigUtils.getTimeUnit(config,
CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_UNIT,
CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_UNIT_DEFAULT);
+ _documentTTLOriginField =
+ ConfigUtils.getString(config,
CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD, null);
+ _documentTTLOriginUnits =
+ ConfigUtils.getTimeUnit(config,
CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD_UNITS,
+
CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD_UNITS_DEFAULT);
+
String bucketName = ConfigUtils.getString(config,
CouchbaseWriterConfigurationKeys.BUCKET,
CouchbaseWriterConfigurationKeys.BUCKET_DEFAULT);
_cluster = CouchbaseCluster.create(couchbaseEnvironment, hosts);
- if(!password.isEmpty()) {
- _bucket = _cluster.openBucket(bucketName, password,
- Collections.<Transcoder<? extends Document,
?>>singletonList(_tupleDocumentTranscoder));
+ if (usesCertAuth) {
+ _cluster.authenticate(CertAuthenticator.INSTANCE);
+ _bucket = _cluster.openBucket(bucketName,
Collections.singletonList(_tupleDocumentTranscoder));
+ } else if (password.isEmpty()) {
+ _bucket = _cluster.openBucket(bucketName,
Collections.singletonList(_tupleDocumentTranscoder));
} else {
- _bucket = _cluster.openBucket(bucketName,
- Collections.<Transcoder<? extends Document,
?>>singletonList(_tupleDocumentTranscoder));
+ _bucket = _cluster.openBucket(bucketName, password,
Collections.singletonList(_tupleDocumentTranscoder));
}
-
_operationTimeout = ConfigUtils.getLong(config,
CouchbaseWriterConfigurationKeys.OPERATION_TIMEOUT_MILLIS,
CouchbaseWriterConfigurationKeys.OPERATION_TIMEOUT_DEFAULT);
_operationTimeunit = TimeUnit.MILLISECONDS;
_defaultWriteResponseMapper = new GenericWriteResponseWrapper<>();
- log.info("Couchbase writer configured with: hosts: {}, bucketName: {},
operationTimeoutInMillis: {}",
- hosts, bucketName, _operationTimeout);
+ log.info("Couchbase writer configured with: hosts: {}, bucketName: {},
operationTimeoutInMillis: {}", hosts,
+ bucketName, _operationTimeout);
}
@VisibleForTesting
@@ -152,7 +171,13 @@ public class CouchbaseWriter<D extends AbstractDocument>
implements AsyncDataWri
if (record instanceof TupleDocument) {
((TupleDocument) record).content().value1().retain();
}
- Observable<D> observable = _bucket.async().upsert(record);
+ Observable<D> observable;
+ try {
+ observable = _bucket.async().upsert(setDocumentTTL(record));
+ } catch (DataRecordException e) {
+ throw new RuntimeException("Caught exception trying to set TTL of the
document", e);
+ }
+
if (callback == null) {
return new WriteResponseFuture<>(
observable.timeout(_operationTimeout,
_operationTimeunit).toBlocking().toFuture(),
@@ -179,10 +204,9 @@ public class CouchbaseWriter<D extends AbstractDocument>
implements AsyncDataWri
}
@Override
- public WriteResponse get()
- throws InterruptedException, ExecutionException {
+ public WriteResponse get() throws InterruptedException,
ExecutionException {
Pair<WriteResponse, Throwable> writeResponseThrowablePair =
writeResponseQueue.take();
- return getWriteResponseorThrow(writeResponseThrowablePair);
+ return getWriteResponseOrThrow(writeResponseThrowablePair);
}
@Override
@@ -192,49 +216,47 @@ public class CouchbaseWriter<D extends AbstractDocument>
implements AsyncDataWri
if (writeResponseThrowablePair == null) {
throw new TimeoutException("Timeout exceeded while waiting for
future to be done");
} else {
- return getWriteResponseorThrow(writeResponseThrowablePair);
+ return getWriteResponseOrThrow(writeResponseThrowablePair);
}
}
};
- observable.timeout(_operationTimeout, _operationTimeunit)
- .subscribe(new Subscriber<D>() {
- @Override
- public void onCompleted() {
- }
+ observable.timeout(_operationTimeout, _operationTimeunit).subscribe(new
Subscriber<D>() {
+ @Override
+ public void onCompleted() {
+ }
- @Override
- public void onError(Throwable e) {
- callbackFired.set(true);
- writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null,
e));
- callback.onFailure(e);
- }
+ @Override
+ public void onError(Throwable e) {
+ callbackFired.set(true);
+ writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, e));
+ callback.onFailure(e);
+ }
- @Override
- public void onNext(D doc) {
- try {
- callbackFired.set(true);
- WriteResponse writeResponse = new GenericWriteResponse<D>(doc);
- writeResponseQueue.add(new Pair<WriteResponse,
Throwable>(writeResponse, null));
- callback.onSuccess(writeResponse);
- } finally {
- if (doc instanceof TupleDocument) {
- ((TupleDocument) doc).content().value1().release();
- }
- }
+ @Override
+ public void onNext(D doc) {
+ try {
+ callbackFired.set(true);
+ WriteResponse writeResponse = new GenericWriteResponse<D>(doc);
+ writeResponseQueue.add(new Pair<WriteResponse,
Throwable>(writeResponse, null));
+ callback.onSuccess(writeResponse);
+ } finally {
+ if (doc instanceof TupleDocument) {
+ ((TupleDocument) doc).content().value1().release();
}
- });
+ }
+ }
+ });
return writeResponseFuture;
}
}
@Override
- public void flush()
- throws IOException {
+ public void flush() throws IOException {
}
- private WriteResponse getWriteResponseorThrow(Pair<WriteResponse, Throwable>
writeResponseThrowablePair)
+ private WriteResponse getWriteResponseOrThrow(Pair<WriteResponse, Throwable>
writeResponseThrowablePair)
throws ExecutionException {
if (writeResponseThrowablePair.getFirst() != null) {
return writeResponseThrowablePair.getFirst();
@@ -246,17 +268,71 @@ public class CouchbaseWriter<D extends AbstractDocument>
implements AsyncDataWri
}
@Override
- public void cleanup()
- throws IOException {
+ public void cleanup() throws IOException {
}
- @Override
- public WriteResponse write(D record)
- throws IOException {
+ /**
+ * Returns a new document with 32 bit (int) timestamp expiration date for
the document. Note this is a current limitation in couchbase.
+ * This approach should work for documents that do not expire until 2038.
This should be enough headroom for couchbase
+ * to reimplement the design.
+ * Source:
https://forums.couchbase.com/t/document-expiry-in-seconds-or-a-timestamp/6519/6
+ * @param record
+ * @return
+ */
+ private D setDocumentTTL(D record) throws DataRecordException {
+ boolean recordIsTupleDocument = record instanceof TupleDocument;
+ boolean recordIsJsonDocument = record instanceof RawJsonDocument;
+
+ long ttlSpanSec = TimeUnit.SECONDS.convert(_documentTTL,
_documentTTLTimeUnits);
+ long eventOriginSec = 0;
+ String dataJson = null;
+ if (_documentTTL == 0) {
+ return record;
+ } else if (_documentTTLOriginField != null &&
!_documentTTLOriginField.isEmpty()) {
+
+ if (recordIsTupleDocument) {
+ ByteBuf dataByteBuffer = ((Tuple2<ByteBuf, Integer>)
record.content()).value1();
+ dataJson = new String(dataByteBuffer.array(), StandardCharsets.UTF_8);
+ } else {
+ dataJson = (String) record.content();
+ }
+ JsonElement jsonDataRootElement = new JsonParser().parse(dataJson);
+ if (!jsonDataRootElement.isJsonObject()) {
+ throw new DataRecordException(
+ String.format("Document TTL Field is set but the record's value is
not a valid json object.: '%s'",
+ jsonDataRootElement.toString()));
+ }
+ JsonObject jsonDataRoot = jsonDataRootElement.getAsJsonObject();
+ long documentTTLOrigin =
jsonDataRoot.get(_documentTTLOriginField).getAsLong();
+ eventOriginSec = TimeUnit.SECONDS.convert(documentTTLOrigin,
_documentTTLOriginUnits);
+ } else {
+ eventOriginSec = System.currentTimeMillis() / 1000;
+ }
+ try {
+ int expiration = Math.toIntExact(ttlSpanSec + eventOriginSec);
+ if (recordIsTupleDocument) {
+ return (D) _tupleDocumentTranscoder.newDocument(record.id(),
expiration,
+ (Tuple2<ByteBuf, Integer>) record.content(), record.cas(),
record.mutationToken());
+ } else if (recordIsJsonDocument) {
+ return (D) RawJsonDocument.create(record.id(), expiration, (String)
record.content(), record.cas(),
+ record.mutationToken());
+ } else {
+ throw new RuntimeException(" Only TupleDocument and RawJsonDocument
documents are supported");
+ }
+ } catch (ArithmeticException e) {
+ throw new RuntimeException(
+ "There was an overflow calculating the expiry timestamp. couchbase
currently only supports expiry until January 19, 2038 03:14:07 GMT",
+ e);
+ }
+ }
+ @Override
+ public WriteResponse write(D record) throws IOException {
try {
- D doc = _bucket.upsert(record);
+ D doc = _bucket.upsert(setDocumentTTL(record));
+
+ Preconditions.checkNotNull(doc);
return new GenericWriteResponse(doc);
} catch (Exception e) {
throw new IOException("Failed to write to Couchbase cluster", e);
diff --git
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
index 7e8625b..de155da 100644
---
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
+++
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterBuilder.java
@@ -17,34 +17,32 @@
package org.apache.gobblin.couchbase.writer;
-import java.io.IOException;
-import java.util.Properties;
-
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.typesafe.config.Config;
-
+import java.io.IOException;
+import java.util.Properties;
+import junit.framework.Assert;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.AsyncWriterManager;
import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.AsyncWriterManager;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.DataWriterBuilder;
-
-
+import org.apache.log4j.Logger;
public class CouchbaseWriterBuilder extends DataWriterBuilder {
- @Override
- public DataWriter build()
- throws IOException {
- State state = this.destination.getProperties();
- Properties taskProps = state.getProperties();
- Config config = ConfigUtils.propertiesToConfig(taskProps);
+ private static final Logger LOG =
Logger.getLogger(CouchbaseWriterBuilder.class);
+ public DataWriter build(Config config) throws IOException {
+ Assert.assertNotNull("Config cannot be null", config);
+ config.entrySet().stream().forEach(x -> String.format("Config passed to
factory builder '%s':'%s'", x.getKey(), x.getValue().toString()));
CouchbaseEnvironment couchbaseEnvironment =
CouchbaseEnvironmentFactory.getInstance(config);
+
//TODO: Read config to decide whether to build a blocking writer or an
async writer
- double failureAllowance = ConfigUtils.getDouble(config,
CouchbaseWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
- CouchbaseWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) /
100.0;
+ double failureAllowance =
+ ConfigUtils.getDouble(config,
CouchbaseWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
+ CouchbaseWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) /
100.0;
boolean retriesEnabled = ConfigUtils.getBoolean(config,
CouchbaseWriterConfigurationKeys.RETRIES_ENABLED,
CouchbaseWriterConfigurationKeys.RETRIES_ENABLED_DEFAULT);
@@ -62,4 +60,12 @@ public class CouchbaseWriterBuilder extends
DataWriterBuilder {
.config(config)
.build();
}
+
+ @Override
+ public DataWriter build() throws IOException {
+ State state = this.destination.getProperties();
+ Properties taskProps = state.getProperties();
+ Config config = ConfigUtils.propertiesToConfig(taskProps);
+ return build(config);
+ }
}
diff --git
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterConfigurationKeys.java
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterConfigurationKeys.java
index 9d4400b..56dc5c1 100644
---
a/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterConfigurationKeys.java
+++
b/gobblin-modules/gobblin-couchbase/src/main/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterConfigurationKeys.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.couchbase.writer;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
public class CouchbaseWriterConfigurationKeys {
@@ -40,6 +41,14 @@ public class CouchbaseWriterConfigurationKeys {
public static final String SSL_TRUSTSTORE_FILE = prefix("sslTruststoreFile");
public static final String SSL_TRUSTSTORE_PASSWORD =
prefix("sslTruststorePassword");
public static final String CERT_AUTH_ENABLED = prefix("certAuthEnabled");
+ public static final String DNS_SRV_ENABLED = prefix("dnsSrvEnabled");
+
+ public static final String DOCUMENT_TTL = prefix("documentTTL");
+ public static final String DOCUMENT_TTL_UNIT = prefix("documentTTLUnits");
+ public static final TimeUnit DOCUMENT_TTL_UNIT_DEFAULT = TimeUnit.SECONDS;
+ public static final String DOCUMENT_TTL_ORIGIN_FIELD =
prefix("documentTTLOriginField");
+ public static final String DOCUMENT_TTL_ORIGIN_FIELD_UNITS =
prefix("documentTTLOriginUnits");
+ public static final TimeUnit DOCUMENT_TTL_ORIGIN_FIELD_UNITS_DEFAULT =
TimeUnit.MILLISECONDS;
public static final String OPERATION_TIMEOUT_MILLIS =
prefix("operationTimeoutMillis");
public static final long OPERATION_TIMEOUT_DEFAULT = 10000; // 10 second
default timeout
diff --git
a/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/CouchbaseTestServer.java
b/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/CouchbaseTestServer.java
index 63ae423..da83c82 100644
---
a/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/CouchbaseTestServer.java
+++
b/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/CouchbaseTestServer.java
@@ -17,8 +17,8 @@
package org.apache.gobblin.couchbase;
+import com.couchbase.mock.CouchbaseMock;
import java.io.BufferedReader;
-import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
@@ -47,11 +47,7 @@ import org.apache.gobblin.test.TestUtils;
@Slf4j
public class CouchbaseTestServer {
- private static final String
COUCHBASE_JAR_PATH="gobblin-modules/gobblin-couchbase/mock-couchbase/target/";
- private static final String COUCHBASE_MOCK_JAR=COUCHBASE_JAR_PATH +
"CouchbaseMock-1.5.18.jar";
-
- private Process couchbaseProcess;
private int _port;
private int _serverPort;
@@ -62,12 +58,8 @@ public class CouchbaseTestServer {
public void start()
{
-
log.info("Starting couchbase server on port " + _port);
- String[] commands = {"/usr/bin/java",
- "-cp",
- COUCHBASE_MOCK_JAR,
- "com.couchbase.mock.CouchbaseMock",
+ String[] commands = {
"--port",
_port +"",
"-n",
@@ -81,7 +73,7 @@ public class CouchbaseTestServer {
try {
System.out.println("Will run command " + Arrays.toString(commands));
- couchbaseProcess = new
ProcessBuilder().inheritIO().command(commands).start();
+ CouchbaseMock.main(commands);
}
catch (Exception e)
{
@@ -157,16 +149,7 @@ public class CouchbaseTestServer {
public int getPort() { return _port; }
- public void stop() {
-
- if (couchbaseProcess != null) {
- try {
- couchbaseProcess.destroy();
- } catch (Exception e) {
- log.warn("Failed to stop the couchbase server", e);
- }
- }
- }
+ public void stop() {}
@Test
diff --git
a/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterTest.java
b/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterTest.java
index c3927de..5a01de9 100644
---
a/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterTest.java
+++
b/gobblin-modules/gobblin-couchbase/src/test/java/org/apache/gobblin/couchbase/writer/CouchbaseWriterTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.couchbase.writer;
+import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
@@ -26,6 +27,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -40,6 +42,7 @@ import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.writer.GenericWriteResponse;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
@@ -135,9 +138,7 @@ public class CouchbaseWriterTest {
@Test
public void testTupleDocumentWrite()
throws IOException, DataConversionException, ExecutionException,
InterruptedException {
- Properties props = new Properties();
- props.setProperty(CouchbaseWriterConfigurationKeys.BUCKET, "default");
- Config config = ConfigFactory.parseProperties(props);
+ Config config = getConfig("default", Optional.empty(), Optional.empty(),
Optional.empty());
CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
try {
@@ -178,6 +179,69 @@ public class CouchbaseWriterTest {
}
+ private Config getConfig(String bucket, Optional<Integer> ttl,
Optional<TimeUnit> ttlTimeUnit, Optional<String> ttlOriginField) {
+ Properties props = new Properties();
+ props.setProperty(CouchbaseWriterConfigurationKeys.BUCKET, bucket);
+ ttl.ifPresent(x ->
props.setProperty(CouchbaseWriterConfigurationKeys.DOCUMENT_TTL, "" + x));
+ ttlTimeUnit.ifPresent(x ->
props.setProperty(CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_UNIT, "" + x));
+ ttlOriginField.ifPresent(x ->
props.setProperty(CouchbaseWriterConfigurationKeys.DOCUMENT_TTL_ORIGIN_FIELD,
"" + x));
+ return ConfigFactory.parseProperties(props);
+ }
+ /**
+ * Test that a single tuple document can be written successfully.
+ * @throws IOException
+ * @throws DataConversionException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTupleDocumentWriteWithTtl()
+ throws IOException, DataConversionException, ExecutionException,
InterruptedException {
+ int ttl = 10000;
+ long expiry = Math.toIntExact(System.currentTimeMillis() / 1000) + ttl;
+ Config config = getConfig("default", Optional.of(ttl), Optional.empty(),
Optional.empty());
+
+ CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
+ try {
+ Schema dataRecordSchema =
+
SchemaBuilder.record("Data").fields().name("data").type().bytesType().noDefault().name("flags").type().intType()
+ .noDefault().endRecord();
+
+ Schema schema =
SchemaBuilder.record("TestRecord").fields().name("key").type().stringType().noDefault().name("data")
+ .type(dataRecordSchema).noDefault().endRecord();
+
+ GenericData.Record testRecord = new GenericData.Record(schema);
+
+ String testContent = "hello world";
+
+ GenericData.Record dataRecord = new GenericData.Record(dataRecordSchema);
+ dataRecord.put("data",
ByteBuffer.wrap(testContent.getBytes(Charset.forName("UTF-8"))));
+ dataRecord.put("flags", 0);
+
+ testRecord.put("key", "hello");
+ testRecord.put("data", dataRecord);
+
+ Converter<Schema, String, GenericRecord, TupleDocument> recordConverter
= new AvroToCouchbaseTupleConverter();
+
+ TupleDocument doc = recordConverter.convertRecord("", testRecord,
null).iterator().next();
+ AbstractDocument storedDoc = ((GenericWriteResponse<AbstractDocument>)
writer.write(doc, null).get()).getRawResponse();
+ TupleDocument returnDoc = writer.getBucket().get("hello",
TupleDocument.class);
+
+ byte[] returnedBytes = new
byte[returnDoc.content().value1().readableBytes()];
+ returnDoc.content().value1().readBytes(returnedBytes);
+ Assert.assertEquals(returnedBytes,
testContent.getBytes(Charset.forName("UTF-8")));
+
+ int returnedFlags = returnDoc.content().value2();
+ Assert.assertEquals(returnedFlags, 0);
+
+ // Since get operations do not set the expiry meta, we need to rely
+ Assert.assertEquals(storedDoc.expiry() - expiry , 0, 50 );
+
+ } finally {
+ writer.close();
+ }
+
+ }
/**
* Test that a single Json document can be written successfully
* @throws IOException
@@ -208,6 +272,153 @@ public class CouchbaseWriterTest {
}
}
+ /**
+ * Test that a single Json document can be written successfully with TTL
+ * @throws IOException
+ * @throws DataConversionException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ @Test(groups={"timeout"})
+ public void testJsonDocumentWriteTTL()
+ throws IOException, DataConversionException, ExecutionException,
InterruptedException {
+ int ttl = 1000;
+ int expiry = Math.toIntExact(System.currentTimeMillis() / 1000) + ttl;
+ Config config = getConfig("default", Optional.of(ttl), Optional.empty(),
Optional.empty());
+ CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
+ try {
+
+ String key = "hello";
+ String testContent = "hello world";
+ HashMap<String, String> contentMap = new HashMap<>();
+ contentMap.put("value", testContent);
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(contentMap);
+ RawJsonDocument jsonDocument = RawJsonDocument.create(key, jsonString);
+ AbstractDocument storedDoc = ((GenericWriteResponse<AbstractDocument>)
writer.write(jsonDocument, null).get()).getRawResponse();
+ RawJsonDocument returnDoc = writer.getBucket().get(key,
RawJsonDocument.class);
+
+ Map<String, String> returnedMap = gson.fromJson(returnDoc.content(),
Map.class);
+ Assert.assertEquals(testContent, returnedMap.get("value"));
+ Assert.assertEquals(storedDoc.expiry(), expiry, 50);
+ } finally {
+ writer.close();
+ }
+ }
+
+
+ /**
+ * Test that a single Json document can be written successfully with TTL and
timeunits
+ * @throws IOException
+ * @throws DataConversionException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ @Test(groups={"timeout"})
+ public void testJsonDocumentWriteTTLWithTimeUnits()
+ throws IOException, DataConversionException, ExecutionException,
InterruptedException {
+ int ttl = 1;
+ TimeUnit timeUnit = TimeUnit.DAYS;
+ int expiry = Math.toIntExact(System.currentTimeMillis() / 1000) + (int)
TimeUnit.SECONDS.convert(ttl, timeUnit);
+ Config config = getConfig("default", Optional.of(ttl),
Optional.of(timeUnit), Optional.empty());
+ CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
+ try {
+
+ String key = "hello";
+ String testContent = "hello world";
+ HashMap<String, String> contentMap = new HashMap<>();
+ contentMap.put("value", testContent);
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(contentMap);
+ RawJsonDocument jsonDocument = RawJsonDocument.create(key, jsonString);
+ AbstractDocument storedDoc = ((GenericWriteResponse<AbstractDocument>)
writer.write(jsonDocument, null).get()).getRawResponse();
+ RawJsonDocument returnDoc = writer.getBucket().get(key,
RawJsonDocument.class);
+
+ Map<String, String> returnedMap = gson.fromJson(returnDoc.content(),
Map.class);
+ Assert.assertEquals(testContent, returnedMap.get("value"));
+ Assert.assertEquals(storedDoc.expiry() - expiry, 0, 50);
+ } finally {
+ writer.close();
+ }
+ }
+
+
+ /**
+ * Test that a single Json document can be written successfully with TTL and
timeunits
+ * @throws IOException
+ * @throws DataConversionException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ @Test(groups={"timeout"})
+ public void testJsonDocumentWriteTtlWithField()
+ throws ExecutionException, InterruptedException {
+ int ttl = 30;
+ int originDiffFromNow = 5;
+ TimeUnit timeUnit = TimeUnit.DAYS;
+ String ttlOriginField = "time";
+ long now = System.currentTimeMillis();
+ long originDelta = TimeUnit.MILLISECONDS.convert(originDiffFromNow,
TimeUnit.DAYS);
+ long origin = now - originDelta;
+ long expiry = TimeUnit.SECONDS.convert(now, TimeUnit.MILLISECONDS) +
TimeUnit.SECONDS.convert(ttl, timeUnit) -
TimeUnit.SECONDS.convert(originDiffFromNow, timeUnit) ;
+
+ Config config = getConfig("default", Optional.of(ttl),
Optional.of(timeUnit), Optional.of(ttlOriginField));
+ CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
+ try {
+
+ String key = "hello";
+ String testContent = "hello world";
+ HashMap<String, String> contentMap = new HashMap<>();
+ contentMap.put("value", testContent);
+ contentMap.put(ttlOriginField, "" + origin);
+ Gson gson = new Gson();
+ String jsonString = gson.toJson(contentMap);
+ RawJsonDocument jsonDocument = RawJsonDocument.create(key, jsonString);
+ AbstractDocument storedDoc = ((GenericWriteResponse<AbstractDocument>)
writer.write(jsonDocument, null).get()).getRawResponse();
+ RawJsonDocument returnDoc = writer.getBucket().get(key,
RawJsonDocument.class);
+
+ Map<String, String> returnedMap = gson.fromJson(returnDoc.content(),
Map.class);
+ Assert.assertEquals(testContent, returnedMap.get("value"));
+ Assert.assertEquals(storedDoc.expiry() , expiry, 50);
+ } finally {
+ writer.close();
+ }
+ }
+
+ @Test(groups={"timeout"})
+ public void testJsonDocumentWriteTtlWithNestedField()
+ throws ExecutionException, InterruptedException {
+ int ttl = 30;
+ int originDiffFromNow = 5;
+ TimeUnit timeUnit = TimeUnit.DAYS;
+ String ttlOriginField = "a.b.time";
+ long now = System.currentTimeMillis();
+ long originDelta = TimeUnit.MILLISECONDS.convert(originDiffFromNow,
timeUnit);
+ long origin = now - originDelta;
+ long expiry = TimeUnit.SECONDS.convert(now, TimeUnit.MILLISECONDS) +
TimeUnit.SECONDS.convert(ttl, timeUnit) -
TimeUnit.SECONDS.convert(originDiffFromNow, timeUnit) ;
+
+ Config config = getConfig("default", Optional.of(ttl),
Optional.of(timeUnit), Optional.of(ttlOriginField));
+ CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
+ try {
+ JsonObject jsonRoot = new JsonObject();
+ String key = "keyValue";
+ String testContent = "hello world";
+ String valueKey = "value";
+ jsonRoot.addProperty(valueKey, testContent);
+ jsonRoot.addProperty(ttlOriginField, origin);
+
+ RawJsonDocument jsonDocument = RawJsonDocument.create(key,
jsonRoot.toString());
+ AbstractDocument storedDoc = ((GenericWriteResponse<AbstractDocument>)
writer.write(jsonDocument, null).get()).getRawResponse();
+ RawJsonDocument returnDoc = writer.getBucket().get(key,
RawJsonDocument.class);
+
+ Map<String, String> returnedMap = new
Gson().fromJson(returnDoc.content(), Map.class);
+ Assert.assertEquals(returnedMap.get(valueKey), testContent);
+ Assert.assertEquals(storedDoc.expiry() , expiry, 50);
+ } finally {
+ writer.close();
+ }
+ }
+
private void drainQueue(BlockingQueue<Pair<AbstractDocument, Future>> queue,
int threshold, long sleepTime,
TimeUnit sleepUnit, List<Pair<AbstractDocument, Future>> failedFutures) {
while (queue.remainingCapacity() < threshold) {
@@ -367,10 +578,7 @@ public class CouchbaseWriterTest {
private void writeRecordsWithAsyncWriter(Iterator<AbstractDocument>
recordIterator)
throws IOException {
boolean verbose = false;
- Properties props = new Properties();
- props.setProperty(CouchbaseWriterConfigurationKeys.BUCKET, "default");
- Config config = ConfigFactory.parseProperties(props);
-
+ Config config = getConfig("default", Optional.empty(), Optional.empty(),
Optional.empty());
CouchbaseWriter writer = new CouchbaseWriter(_couchbaseEnvironment,
config);
try {
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
index e1d70f1..d40c38f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ConfigUtils.java
@@ -17,9 +17,11 @@
package org.apache.gobblin.util;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.StringReader;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,6 +32,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Function;
@@ -68,6 +72,13 @@ public class ConfigUtils {
* typesafe config does not allow such properties. */
public static final String STRIP_SUFFIX = ".ROOT_VALUE";
+ /**
+ * Available TimeUnit values that can be parsed from a given String
+ */
+ private static final Set<String> validTimeUnits =
Arrays.stream(TimeUnit.values())
+ .map(TimeUnit::name)
+ .collect(Collectors.toSet());
+
public ConfigUtils(FileUtils fileUtils) {
this.fileUtils = fileUtils;
}
@@ -327,6 +338,23 @@ public class ConfigUtils {
}
/**
+ * Return TimeUnit value at <code>path</code> if <code>config</code> has
path. If not return <code>def</code>
+ *
+ * @param config in which the path may be present
+ * @param path key to look for in the config object
+ * @return TimeUnit value at <code>path</code> if <code>config</code> has
path. If not return <code>def</code>
+ */
+ public static TimeUnit getTimeUnit(Config config, String path, TimeUnit def)
{
+ if (config.hasPath(path)) {
+ String timeUnit = config.getString(path).toUpperCase();
+ Preconditions.checkArgument(validTimeUnits.contains(timeUnit),
+ "Passed invalid TimeUnit for documentTTLUnits:
'%s'".format(timeUnit));
+ return TimeUnit.valueOf(timeUnit);
+ }
+ return def;
+ }
+
+ /**
* Return {@link Long} value at <code>path</code> if <code>config</code> has
path. If not return <code>def</code>
*
* @param config in which the path may be present
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/ConfigUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/ConfigUtilsTest.java
index d4395f7..ebf3dce 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/ConfigUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/ConfigUtilsTest.java
@@ -29,6 +29,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.jasypt.util.text.BasicTextEncryptor;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -299,6 +300,37 @@ public class ConfigUtilsTest {
keyFile.delete();
}
+ @Test
+ public void testGetTimeUnitValid() {
+ String key = "a.b.c";
+ TimeUnit expectedTimeUnit = TimeUnit.DAYS;
+ TimeUnit defaultTimeUnit = TimeUnit.MILLISECONDS;
+ Config cfg = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(key, TimeUnit.DAYS.name())
+ .build());
+ TimeUnit timeUnit = ConfigUtils.getTimeUnit(cfg, key, defaultTimeUnit);
+ Assert.assertEquals(timeUnit, expectedTimeUnit);
+ }
+
+ @Test
+ public void testGetTimeUnitInvalid() {
+ String key = "a.b.c";
+ final Config cfg = ConfigFactory.parseMap(ImmutableMap.<String,
Object>builder()
+ .put(key, "INVALID_TIME_UNIT")
+ .build());
+ Assert.assertThrows(IllegalArgumentException.class, () -> {
+ ConfigUtils.getTimeUnit(cfg, key, TimeUnit.SECONDS);
+ });
+ }
+
+ @Test
+ public void testGetTimeUnitDefault() {
+ String key = "a.b.c";
+ TimeUnit defaultTimeUnit = TimeUnit.MINUTES;
+ final Config cfg = ConfigFactory.empty();
+ Assert.assertEquals(ConfigUtils.getTimeUnit(cfg, key, defaultTimeUnit),
defaultTimeUnit);
+ }
+
private File newKeyFile(String masterPwd) throws IOException {
File masterPwdFile = File.createTempFile("masterPassword", null);
masterPwdFile.deleteOnExit();
diff --git a/mkdocs.yml b/mkdocs.yml
index 5c4a345..69996b6 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -62,6 +62,8 @@ pages:
- SQL Server: sources/SqlServerSource.md
- Teradata: sources/TeradataSource.md
- Wikipedia: sources/WikipediaSource.md
+ - Writers:
+ - CouchbaseWriter: CouchbaseWriter.md
- Record Sinks:
- Avro HDFS: sinks/AvroHdfsDataWriter.md
- Parquet HDFS: sinks/ParquetHdfsDataWriter.md