[ 
https://issues.apache.org/jira/browse/BEAM-1893?focusedWorklogId=219474&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-219474
 ]

ASF GitHub Bot logged work on BEAM-1893:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Mar/19 17:27
            Start Date: 27/Mar/19 17:27
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on pull request #8152: 
[DoNotMerge][BEAM-1893] Implementation of CouchbaseIO
URL: https://github.com/apache/beam/pull/8152#discussion_r269680000
 
 

 ##########
 File path: 
sdks/java/io/couchbase/src/main/java/org/apache/beam/sdk/io/couchbase/CouchbaseIO.java
 ##########
 @@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.couchbase;
+
+import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.CouchbaseCluster;
+import com.couchbase.client.java.document.JsonDocument;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
+import com.couchbase.client.java.query.N1qlQuery;
+import com.couchbase.client.java.query.N1qlQueryResult;
+import com.couchbase.client.java.query.N1qlQueryRow;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IO to read and write data on Couchbase.
+ *
+ * <h3>Reading from Couchbase</h3>
+ *
+ * <p>{@link CouchbaseIO} provides a source to read data and returns a bounded 
set of JsonDocument.
+ * The {@link JsonDocument} is the JSON form of Couchbase document.
+ *
+ * <p>The following example illustrates various options for configuring the IO:
+ *
+ * <pre>{@code
+ * pipeline.apply(
+ *            CouchbaseIO.read()
+ *                .withHosts(Arrays.asList("host1", "host2"))
+ *                .withHttpPort(8091) // Optional
+ *                .withCarrierPort(11210) // Optional
+ *                .withBucket("bucket1")
+ *                .withPassword("pwd")) // Bucket-level password
+ *
+ *
+ * }</pre>
+ */
+public class CouchbaseIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CouchbaseIO.class);
+
+  private CouchbaseIO() {}
+
+  /**
+   * Provide a {@link Read} {@link PTransform} to read data from a Couchbase 
database. Here some
+   * default options are provided.
+   *
+   * @return a {@link PTransform} reading data from Couchbase
+   */
+  public static Read read() {
+    return new AutoValue_CouchbaseIO_Read.Builder().build();
+  }
+
+  /** A {@link PTransform} to read data from Couchbase. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, 
PCollection<JsonDocument>> {
+    @Nullable
+    abstract List<String> hosts();
+
+    @Nullable
+    abstract Integer httpPort();
+
+    @Nullable
+    abstract Integer carrierPort();
+
+    @Nullable
+    abstract String bucket();
+
+    @Nullable
+    abstract String password();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setHosts(List<String> hosts);
+
+      abstract Builder setHttpPort(Integer port);
+
+      abstract Builder setCarrierPort(Integer port);
+
+      abstract Builder setBucket(String bucket);
+
+      abstract Builder setPassword(String password);
+
+      abstract Read build();
+    }
+
+    /**
+     * Define a list of ip to the cluster nodes.
+     *
+     * @param hosts list of ip address
+     * @return a {@link PTransform} reading data from Couchbase
+     */
+    public Read withHosts(List<String> hosts) {
+      checkArgument(hosts != null, "hosts can not be null");
+      checkArgument(!hosts.isEmpty(), "hosts can not be empty");
+      return builder().setHosts(hosts).build();
+    }
+
+    /**
+     * Define the http port connecting to Couchbase.
+     *
+     * @param port the http port
+     * @return a {@link PTransform} reading data from Couchbase
+     */
+    public Read withHttpPort(int port) {
+      checkArgument(port > 0, "httpPort must be > 0, but was: %s", port);
+      return builder().setHttpPort(port).build();
+    }
+
+    /**
+     * Define the carrier port connecting to Couchbase.
+     *
+     * @param port the carrier port
+     * @return a {@link PTransform} reading data from Couchbase
+     */
+    public Read withCarrierPort(int port) {
+      checkArgument(port > 0, "carrierPort must be > 0, but was: %s", port);
+      return builder().setCarrierPort(port).build();
+    }
+
+    /**
+     * Define the name of bucket.
+     *
+     * @param bucket the bucket name
+     * @return a {@link PTransform} reading data from Couchbase
+     */
+    public Read withBucket(String bucket) {
+      checkArgument(bucket != null, "bucket can not be null");
+      return builder().setBucket(bucket).build();
+    }
+
+    /**
+     * Define the bucket-level password to the target bucket.
+     *
+     * @param password password
+     * @return a {@link PTransform} reading data from Couchbase
+     */
+    public Read withPassword(String password) {
+      checkArgument(password != null, "password can not be null");
+      return builder().setPassword(password).build();
+    }
+
+    @Override
+    public PCollection<JsonDocument> expand(PBegin input) {
+      checkArgument((hosts() != null), "WithHosts()is required");
+      checkArgument(bucket() != null, "withBucket() is required");
+
+      CouchbaseSource source = new CouchbaseSource(this);
+      PCollection<JsonDocument> result = 
input.apply(org.apache.beam.sdk.io.Read.from(source));
+      // Disconnect the client from Couchbase
+      source.close();
+      return result;
+    }
+  }
+
+  @VisibleForTesting
+  static class CouchbaseSource extends BoundedSource<JsonDocument> {
+
+    private final Read spec;
+    private int itemCount;
+    private final int lowerBound; // Lower bound of key range (included)
+    private final int upperBound; // Upper bound of key range (excluded)
+    private Cluster cluster;
+    private Bucket bucket;
+
+    CouchbaseSource(Read spec) {
+      this(spec, null, null, 0, 0);
+    }
+
+    CouchbaseSource(Read spec, Cluster cluster, Bucket bucket, int lb, int ub) 
{
+      this.spec = spec;
+      this.cluster = cluster;
+      this.bucket = bucket;
+      this.lowerBound = lb;
+      this.upperBound = ub;
+    }
+
+    private Bucket getBucket() {
+      if (cluster == null) {
+        DefaultCouchbaseEnvironment.Builder builder = 
DefaultCouchbaseEnvironment.builder();
+        if (spec.httpPort() != null) {
+          builder.bootstrapHttpDirectPort(spec.httpPort());
+        }
+        if (spec.carrierPort() != null) {
+          builder.bootstrapCarrierDirectPort(spec.carrierPort());
+        }
+        cluster = CouchbaseCluster.create(builder.build(), spec.hosts());
+      }
+      if (bucket == null) {
+        // For Couchbase Server, in the previous version than 5.0, the 
passwordless bucket can be
+        // supported.
+        // But after version 5.0, the newly created user should have a 
username equal to bucket name
+        // and a password.
+        // For more information, please go to
+        // 
https://docs.couchbase.com/java-sdk/2.7/sdk-authentication-overview.html#legacy-connection-code
+        bucket =
+            spec.password() == null
+                ? cluster.openBucket(spec.bucket())
+                : cluster.openBucket(spec.bucket(), spec.password());
+      }
+      return bucket;
+    }
+
+    @Override
+    public Coder<JsonDocument> getOutputCoder() {
+      return SerializableCoder.of(JsonDocument.class);
+    }
+
+    @Override
+    public List<? extends BoundedSource<JsonDocument>> split(
+        long desiredBundleSize, PipelineOptions options) {
+      // If the desiredBundleSize equals to 0, it means that there will be 
only one bundle of data
+      // to be read.
+      int totalBundle =
 
 Review comment:
   Since itemCount/estimatedSizeBytes could be null you may need to do `long 
estimatedSizeBytes = getEstimatedSizeBytes(options);` before.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 219474)

> Add IO module for Couchbase
> ---------------------------
>
>                 Key: BEAM-1893
>                 URL: https://issues.apache.org/jira/browse/BEAM-1893
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Xu Mingmin
>            Assignee: LI Guobao
>            Priority: Major
>              Labels: Couchbase, IO, features, triaged
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Create a {{CouchbaseIO}} for Couchbase database.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to