[ 
https://issues.apache.org/jira/browse/BEAM-2661?focusedWorklogId=129448&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129448
 ]

ASF GitHub Bot logged work on BEAM-2661:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/18 19:00
            Start Date: 31/Jul/18 19:00
    Worklog Time Spent: 10m 
      Work Description: timrobertson100 commented on a change in pull request 
#6021: [BEAM-2661] Adds KuduIO
URL: https://github.com/apache/beam/pull/6021#discussion_r206646527
 
 

 ##########
 File path: 
sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
 ##########
 @@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bounded source and sink for Kudu.
+ *
+ * <p>For more information, see the online documentation at <a
+ * href="https://kudu.apache.org/";>Kudu</a>.
+ *
+ * <h3>Reading from Kudu</h3>
+ *
+ * <p>{@code KuduIO} provides a source to read and returns a bounded 
collection of entities as
+ * {@code PCollection&lt;T&gt;}. An entity is built by parsing a Kudu {@link 
RowResult} using the
+ * provided {@link SerializableFunction&lt;RowResult, T&gt;}.
+ *
+ * <p>The following example illustrates various options for configuring the IO:
+ *
+ * <pre>{@code
+ * pipeline.apply(
+ *     KuduIO.<String>read()
+ *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ *         .withTable("table")
+ *         .withParseFn(
+ *             (SerializableFunction<RowResult, String>) input -> 
input.getString(COL_NAME))
+ *         .withCoder(StringUtf8Coder.of()));
+ *     // above options illustrate a typical minimum set, returns 
PCollection<String>
+ * }</pre>
+ *
+ * <p>{@code withCoder(...)} may be omitted if it can be inferred from the 
@{CoderRegistry}.
+ * However, when using a Lambda Expression or an anonymous inner class to 
define the function, type
+ * erasure will prohibit this. In such cases you are required to explicitly 
set the coder as in the
+ * above example.
+ *
+ * <p>Optionally, you can provide {@code withPredicates(...)} to apply a query 
to filter rows from
+ * the kudu table.
+ *
+ * <p>Optionally, you can provide {@code withProjectedColumns(...)} to limit 
the columns returned
+ * from the Kudu scan to improve performance. The columns required in the 
{@code ParseFn} must be
+ * declared in the projected columns.
+ *
+ * <p>Optionally, you can provide {@code withBatchSize(...)} to set the number 
of bytes returned
+ * from the Kudu scanner in each batch.
+ *
+ * <p>Optionally, you can provide {@code withFaultTolerent(...)} to enforce 
the read scan to resume
+ * a scan on another tablet server if the current server fails.
+ *
+ * <h3>Writing to Kudu</h3>
+ *
+ * <p>The Kudu sink executes a set of operations on a single table. It takes 
as input a {@link
+ * PCollection PCollection&lt;T&gt;} and a {@link FormatFunction&lt;T&gt;} 
which is responsible for
+ * converting the input into an idempotent transformation on a row.
+ *
+ * <p>To configure a Kudu sink, you must supply the Kudu master addresses, the 
table name and a
+ * {@link FormatFunction} to convert the input records, for example:
+ *
+ * <pre>{@code
+ * PCollection<MyType> data = ...;
+ * FormatFunction<MyType> fn = ...;
+ *
+ * data.apply("write",
+ *     KuduIO.write()
+ *         .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ *         .withTable("table")
+ *         .withFormatFn(fn));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * {@code KuduIO} does not support authentication in this release.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class KuduIO {
+  private static final Logger LOG = LoggerFactory.getLogger(KuduIO.class);
+
+  private KuduIO() {}
+
+  public static <T> Read<T> read() {
+    return new AutoValue_KuduIO_Read.Builder<T>().setKuduService(new 
KuduServiceImpl<>()).build();
+  }
+
+  public static <T> Write<T> write() {
+    return new AutoValue_KuduIO_Write.Builder<T>().setKuduService(new 
KuduServiceImpl<>()).build();
+  }
+
+  /**
+   * An interface used by the KuduIO Write to convert an input record into an 
Operation to apply as
+   * a mutation in Kudu.
+   */
+  @FunctionalInterface
+  public interface FormatFunction<T> extends 
SerializableFunction<TableAndRecord<T>, Operation> {}
+
+  /** Implementation of {@link KuduIO#read()}. */
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+    @Nullable
+    abstract List<String> getMasterAddresses();
+
+    @Nullable
+    abstract String getTable();
+
+    @Nullable
+    abstract Integer getBatchSize();
+
+    @Nullable
+    abstract List<String> getProjectedColumns();
+
+    @Nullable
+    abstract List<Common.ColumnPredicatePB> getSerializablePredicates();
+
+    @Nullable
+    abstract Boolean getFaultTolerent();
+
+    @Nullable
+    abstract SerializableFunction<RowResult, T> getParseFn();
+
+    @Nullable
+    abstract Coder<T> getCoder();
+
+    @Nullable
+    abstract KuduService<T> getKuduService();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMasterAddresses(List<String> masterAddresses);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setBatchSize(Integer batchSize);
+
+      abstract Builder<T> setProjectedColumns(List<String> projectedColumns);
+
+      abstract Builder<T> setSerializablePredicates(
+          List<Common.ColumnPredicatePB> serializablePredicates);
+
+      abstract Builder<T> setFaultTolerent(Boolean faultTolerent);
+
+      abstract Builder<T> setParseFn(SerializableFunction<RowResult, T> 
parseFn);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Builder<T> setKuduService(KuduService<T> kuduService);
+
+      abstract Read<T> build();
+    }
+
+    @VisibleForTesting
+    Coder<T> inferCoder(CoderRegistry coderRegistry) {
+      try {
+        return getCoder() != null
+            ? getCoder()
+            : coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
+      } catch (CannotProvideCoderException e) {
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn ("
+                + TypeDescriptors.outputOf(getParseFn())
+                + "). Specify it explicitly using withCoder().",
+            e);
+      }
+    }
+
+    /** Reads from the Kudu cluster on the specified master addresses. */
+    public Read<T> withMasterAddresses(String masterAddresses) {
+      checkArgument(masterAddresses != null, "masterAddresses cannot be null 
or empty");
+      return 
builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();
+    }
+
+    /** Reads from the specified table. */
+    public Read<T> withTable(String table) {
+      checkArgument(table != null, "table cannot be null");
+      return builder().setTable(table).build();
+    }
+
+    /** Provides the function to parse a row from Kudu into the typed object. 
*/
+    public Read<T> withParseFn(SerializableFunction<RowResult, T> parseFn) {
+      checkArgument(parseFn != null, "parseFn cannot be null");
+      return builder().setParseFn(parseFn).build();
+    }
+
+    /** Filters the rows read from Kudu using the given predicates. */
+    public Read<T> withPredicates(List<KuduPredicate> predicates) {
+      checkArgument(predicates != null, "predicates cannot be null");
+      // reuse the kudu protobuf serialization mechanism
+      List<Common.ColumnPredicatePB> serializablePredicates =
+          
predicates.stream().map(KuduPredicate::toPB).collect(Collectors.toList());
+      return 
builder().setSerializablePredicates(serializablePredicates).build();
+    }
+
+    /** Filters the columns read from the table to include only those 
specified. */
+    public Read<T> withProjectedColumns(List<String> projectedColumns) {
+      checkArgument(projectedColumns != null, "projectedColumns cannot be 
null");
+      return builder().setProjectedColumns(projectedColumns).build();
+    }
+
+    /** Reads from the table in batches of the specified size. */
+    public Read<T> withBatchSize(int batchSize) {
+      checkArgument(batchSize >= 0, "batchSize must not be negative");
+      return builder().setBatchSize(batchSize).build();
+    }
+
+    /**
+     * Instructs the read scan to resume a scan on another tablet server if 
the current server fails
+     * and faultTolerant is set to true.
+     */
+    public Read<T> withFaultTolerent(boolean faultTolerent) {
+      return builder().setFaultTolerent(faultTolerent).build();
+    }
+
+    /**
+     * Sets a {@link Coder} for the result of the parse function. This may be 
required if a coder
+     * can not be inferred automatically.
+     */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder cannot be null");
+      return builder().setCoder(coder).build();
+    }
+
+    /** Specify an instance of {@link KuduService} used to connect and read 
from Kudu. */
+    @VisibleForTesting
+    Read<T> withKuduService(KuduService<T> kuduService) {
+      checkArgument(kuduService != null, "kuduService cannot be null");
+      return builder().setKuduService(kuduService).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      Pipeline p = input.getPipeline();
+      final Coder<T> coder = inferCoder(p.getCoderRegistry());
+      return input.apply(org.apache.beam.sdk.io.Read.from(new 
KuduSource<>(this, coder, null)));
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(
+          getMasterAddresses() != null,
+          "KuduIO.read() requires a list of master addresses to be set via 
withMasterAddresses(masterAddresses)");
+      checkState(
+          getTable() != null,
+          "KuduIO.read() requires a table name to be set via 
withTableName(tableName)");
+      checkState(
+          getParseFn() != null,
+          "KuduIO.read() requires a parse function to be set via 
withParseFn(parseFn)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("masterAddresses", 
getMasterAddresses().toString()));
+      builder.add(DisplayData.item("table", getTable()));
+    }
+  }
+
+  static class KuduSource<T> extends BoundedSource {
+    final Read<T> spec;
+    private final Coder<T> coder;
+    @Nullable byte[] serializedToken; // only during a split
+
+    KuduSource(Read spec, Coder<T> coder, byte[] serializedToken) {
+      this.spec = spec;
+      this.coder = coder;
+      this.serializedToken = serializedToken;
+    }
+
+    // A Kudu source can be split once only providing a source per tablet
+    @Override
+    public List<BoundedSource<T>> split(long desiredBundleSizeBytes, 
PipelineOptions options)
+        throws KuduException {
+      if (serializedToken != null) {
+        return Collections.singletonList(this); // we are already a split
+
+      } else {
+        Stream<BoundedSource<T>> sources =
+            spec.getKuduService()
+                .createTabletScanners(spec)
+                .stream()
+                .map(s -> new KuduIO.KuduSource<T>(spec, spec.getCoder(), s));
+        return sources.collect(Collectors.toList());
+      }
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) {
+      return 0; // Kudu does not expose tablet sizes
+    }
+
+    @Override
+    public BoundedReader<T> createReader(PipelineOptions options) {
+      return spec.getKuduService().createReader(this);
+    }
+
+    @Override
+    public Coder<T> getOutputCoder() {
+      return coder;
+    }
+  }
+
+  /**
+   * A {@link PTransform} that writes to Kudu. See the class-level Javadoc on 
{@link KuduIO} for
+   * more information.
+   *
+   * @see KuduIO
+   */
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
+    @Nullable
+    abstract List<String> masterAddresses();
+
+    @Nullable
+    abstract String table();
+
+    @Nullable
+    abstract FormatFunction<T> formatFn();
+
+    @Nullable
+    abstract KuduService<T> kuduService();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setMasterAddresses(List<String> masterAddresses);
+
+      abstract Builder<T> setTable(String table);
+
+      abstract Builder<T> setFormatFn(FormatFunction<T> formatFn);
+
+      abstract Builder<T> setKuduService(KuduService<T> kuduService);
+
+      abstract Write<T> build();
+    }
+
+    /** Writes to the Kudu cluster on the specified master addresses. */
+    public Write withMasterAddresses(String masterAddresses) {
+      checkArgument(masterAddresses != null, "masterAddresses cannot be null 
or empty");
+      return 
builder().setMasterAddresses(Splitter.on(",").splitToList(masterAddresses)).build();
+    }
+
+    /** Writes to the specified table. */
+    public Write withTable(String table) {
+      checkArgument(table != null, "table cannot be null");
+      return builder().setTable(table).build();
+    }
+
+    /** Writes using the given function to create the mutation operations from 
the input. */
+    public Write withFormatFn(FormatFunction<T> formatFn) {
+      checkArgument(formatFn != null, "formatFn cannot be null");
+      return builder().setFormatFn(formatFn).build();
+    }
+
+    /** Specify the {@link KuduService} used to connect and write into the 
Kudu table. */
+    @VisibleForTesting
+    Write<T> withKuduService(KuduService<T> kuduService) {
+      checkArgument(kuduService != null, "kuduService cannot be null");
+      return builder().setKuduService(kuduService).build();
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      input.apply(ParDo.of(new WriteFn(this)));
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PipelineOptions pipelineOptions) {
+      checkState(
+          masterAddresses() != null,
+          "KuduIO.write() requires a list of master addresses to be set via 
withMasterAddresses(masterAddresses)");
+      checkState(
+          table() != null, "KuduIO.write() requires a table name to be set via 
withTable(table)");
+      checkState(
+          formatFn() != null,
+          "KuduIO.write() requires a format function to be set via 
withFormatFn(formatFn)");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("masterAddresses", 
masterAddresses().toString()));
+      builder.add(DisplayData.item("tableName", table()));
+      builder.add(DisplayData.item("formatFn", 
formatFn().getClass().getCanonicalName()));
+    }
+
+    private class WriteFn extends DoFn<T, Void> {
+      private final Write<T> spec;
+      private KuduService.Writer writer;
+
+      WriteFn(Write<T> spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws KuduException {
+        writer = spec.kuduService().createWriter(spec);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext context) throws KuduException 
{
+        writer.openSession();
+      }
+
+      @ProcessElement
 
 Review comment:
   Kudu requires a primary key so repeats would _usually_ be seen as `Upsert` 
operations. That is why in the JDoc I said:
   > ... a {@link FormatFunction&lt;T&gt;} which is responsible for converting 
the input into an idempotent transformation on a row
   
   The tests provide an example of that in the `GenerateUpsert` method.
   
   However, people can get creative in their format function (e.g. mint UUIDs) 
and then you could potentially force duplicates. This is similar to how I 
recently patched `ElasticSearchIO` to allow ID functions to enable doc ID and 
upsert behaviour.
   
   I did originally attempt to enforce it as Upsert behaviour and using Kudu 
classes but they simply do not lend themselves to serialization. I opted to 
model as close as possible to other IOs as the alternative.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 129448)
    Time Spent: 4h 10m  (was: 4h)

> Add KuduIO
> ----------
>
>                 Key: BEAM-2661
>                 URL: https://issues.apache.org/jira/browse/BEAM-2661
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-ideas
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Tim Robertson
>            Priority: Major
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> New IO for Apache Kudu ([https://kudu.apache.org/overview.html]).
> This work is in progress [on this 
> branch|https://github.com/timrobertson100/beam/tree/BEAM-2661-KuduIO] with 
> design aspects documented below.
> h2. The API
> The {{KuduIO}} API requires the user to provide a function to convert objects 
> into operations. This is similar to the {{JdbcIO}} but different to others, 
> such as {{HBaseIO}} which requires a pre-transform stage beforehand to 
> convert into the mutations to apply. It was originally intended to copy the 
> {{HBaseIO}} approach, but this was not possible:
>  # The Kudu 
> [Operation|https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html]
>  is a fat class, and is a subclass of {{KuduRpc<OperationResponse>}}. It 
> holds RPC logic, callbacks and a Kudu client. Because of this the 
> {{Operation}} does not serialize and furthermore, the logic for encoding the 
> operations (Insert, Upsert etc) in the Kudu Java API are one way only (no 
> decode) because the server is written in C++.
>  # An alternative could be to introduce a new object to beam (e.g. 
> {{o.a.b.sdk.io.kudu.KuduOperation}}) to enable 
> {{PCollection<KuduOperation>}}. This was considered but was discounted 
> because:
>  ## It is not a familiar API to those already knowing Kudu
>  ## It still requires serialization and deserialization of the operations. 
> Using the existing Kudu approach of serializing into compact byte arrays 
> would require a decoder along the lines of [this almost complete 
> example|https://gist.github.com/timrobertson100/df77d1337ba8f5609319751ee7c6e01e].
>  This is possible but has fragilities given the Kudu code itself continues to 
> evolve. 
>  ## It becomes a trivial codebase in Beam to maintain by defer the object to 
> mutation mapping to within the KuduIO transform. {{JdbcIO}} gives us the 
> precedent to do this.
> h2. Testing framework
> {{Kudu}} is written in C++. While a 
> [TestMiniKuduCluster|https://github.com/cloudera/kudu/blob/master/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java]
>  does exist in Java, it requires binaries to be available for the target 
> environment which is not portable (edit: this is now a [work in 
> progress|https://issues.apache.org/jira/browse/KUDU-2411] in Kudu). Therefore 
> we opt for the following:
>  # Unit tests will use a mock Kudu client
>  # Integration tests will cover the full aspects of the {{KuduIO}} and use a 
> Docker based Kudu instance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to