[
https://issues.apache.org/jira/browse/BEAM-2661?focusedWorklogId=129415&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129415
]
ASF GitHub Bot logged work on BEAM-2661:
----------------------------------------
Author: ASF GitHub Bot
Created on: 31/Jul/18 18:17
Start Date: 31/Jul/18 18:17
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #6021:
[BEAM-2661] Adds KuduIO
URL: https://github.com/apache/beam/pull/6021#discussion_r206629985
##########
File path:
sdks/java/io/kudu/src/main/java/org/apache/beam/sdk/io/kudu/KuduIO.java
##########
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kudu;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.kudu.Common;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bounded source and sink for Kudu.
+ *
+ * <p>For more information, see the online documentation at <a
+ * href="https://kudu.apache.org/">Kudu</a>.
+ *
+ * <h3>Reading from Kudu</h3>
+ *
+ * <p>{@code KuduIO} provides a source to read and returns a bounded
collection of entities as
+ * {@code PCollection<T>}. An entity is built by parsing a Kudu {@link
RowResult} using the
+ * provided {@link SerializableFunction<RowResult, T>}.
+ *
+ * <p>The following example illustrates various options for configuring the IO:
+ *
+ * <pre>{@code
+ * pipeline.apply(
+ * KuduIO.<String>read()
+ * .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ * .withTable("table")
+ * .withParseFn(
+ * (SerializableFunction<RowResult, String>) input ->
input.getString(COL_NAME))
+ * .withCoder(StringUtf8Coder.of()));
+ * // above options illustrate a typical minimum set, returns
PCollection<String>
+ * }</pre>
+ *
+ * <p>{@code withCoder(...)} may be omitted if it can be inferred from the
@{CoderRegistry}.
+ * However, when using a Lambda Expression or an anonymous inner class to
define the function, type
+ * erasure will prohibit this. In such cases you are required to explicitly
set the coder as in the
+ * above example.
+ *
+ * <p>Optionally, you can provide {@code withPredicates(...)} to apply a query
to filter rows from
+ * the kudu table.
+ *
+ * <p>Optionally, you can provide {@code withProjectedColumns(...)} to limit
the columns returned
+ * from the Kudu scan to improve performance. The columns required in the
{@code ParseFn} must be
+ * declared in the projected columns.
+ *
+ * <p>Optionally, you can provide {@code withBatchSize(...)} to set the number
of bytes returned
+ * from the Kudu scanner in each batch.
+ *
+ * <p>Optionally, you can provide {@code withFaultTolerent(...)} to enforce
the read scan to resume
+ * a scan on another tablet server if the current server fails.
+ *
+ * <h3>Writing to Kudu</h3>
+ *
+ * <p>The Kudu sink executes a set of operations on a single table. It takes
as input a {@link
+ * PCollection PCollection<T>} and a {@link FormatFunction<T>}
which is responsible for
+ * converting the input into an idempotent transformation on a row.
+ *
+ * <p>To configure a Kudu sink, you must supply the Kudu master addresses, the
table name and a
+ * {@link FormatFunction} to convert the input records, for example:
+ *
+ * <pre>{@code
+ * PCollection<MyType> data = ...;
+ * FormatFunction<MyType> fn = ...;
+ *
+ * data.apply("write",
+ * KuduIO.write()
+ * .withMasterAddresses("kudu1:8051,kudu2:8051,kudu3:8051")
+ * .withTable("table")
+ * .withFormatFn(fn));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * {@code KuduIO} does not support authentication in this release.
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class KuduIO {
+ private static final Logger LOG = LoggerFactory.getLogger(KuduIO.class);
+
+ private KuduIO() {}
+
+ public static <T> Read<T> read() {
+ return new AutoValue_KuduIO_Read.Builder<T>().setKuduService(new
KuduServiceImpl<>()).build();
+ }
Review comment:
I would recommend have some common helper functions here so that Coders
don't need to be always set (e.g. readBytes -> byte[], readStrings -> String,
etc.). However this can be done in a later PR
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 129415)
Time Spent: 3.5h (was: 3h 20m)
> Add KuduIO
> ----------
>
> Key: BEAM-2661
> URL: https://issues.apache.org/jira/browse/BEAM-2661
> Project: Beam
> Issue Type: New Feature
> Components: io-ideas
> Reporter: Jean-Baptiste Onofré
> Assignee: Tim Robertson
> Priority: Major
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> New IO for Apache Kudu ([https://kudu.apache.org/overview.html]).
> This work is in progress [on this
> branch|https://github.com/timrobertson100/beam/tree/BEAM-2661-KuduIO] with
> design aspects documented below.
> h2. The API
> The {{KuduIO}} API requires the user to provide a function to convert objects
> into operations. This is similar to the {{JdbcIO}} but different to others,
> such as {{HBaseIO}} which requires a pre-transform stage beforehand to
> convert into the mutations to apply. It was originally intended to copy the
> {{HBaseIO}} approach, but this was not possible:
> # The Kudu
> [Operation|https://kudu.apache.org/apidocs/org/apache/kudu/client/Operation.html]
> is a fat class, and is a subclass of {{KuduRpc<OperationResponse>}}. It
> holds RPC logic, callbacks and a Kudu client. Because of this the
> {{Operation}} does not serialize and furthermore, the logic for encoding the
> operations (Insert, Upsert etc) in the Kudu Java API are one way only (no
> decode) because the server is written in C++.
> # An alternative could be to introduce a new object to beam (e.g.
> {{o.a.b.sdk.io.kudu.KuduOperation}}) to enable
> {{PCollection<KuduOperation>}}. This was considered but was discounted
> because:
> ## It is not a familiar API to those already knowing Kudu
> ## It still requires serialization and deserialization of the operations.
> Using the existing Kudu approach of serializing into compact byte arrays
> would require a decoder along the lines of [this almost complete
> example|https://gist.github.com/timrobertson100/df77d1337ba8f5609319751ee7c6e01e].
> This is possible but has fragilities given the Kudu code itself continues to
> evolve.
> ## It becomes a trivial codebase in Beam to maintain by defer the object to
> mutation mapping to within the KuduIO transform. {{JdbcIO}} gives us the
> precedent to do this.
> h2. Testing framework
> {{Kudu}} is written in C++. While a
> [TestMiniKuduCluster|https://github.com/cloudera/kudu/blob/master/java/kudu-client/src/test/java/org/apache/kudu/client/TestMiniKuduCluster.java]
> does exist in Java, it requires binaries to be available for the target
> environment which is not portable (edit: this is now a [work in
> progress|https://issues.apache.org/jira/browse/KUDU-2411] in Kudu). Therefore
> we opt for the following:
> # Unit tests will use a mock Kudu client
> # Integration tests will cover the full aspects of the {{KuduIO}} and use a
> Docker based Kudu instance
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)