[ 
https://issues.apache.org/jira/browse/BEAM-6347?focusedWorklogId=180963&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180963
 ]

ASF GitHub Bot logged work on BEAM-6347:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Jan/19 03:52
            Start Date: 04/Jan/19 03:52
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #7397: 
[BEAM-6347] Add website page for developing I/O connectors for Java
URL: https://github.com/apache/beam/pull/7397#discussion_r245194086
 
 

 ##########
 File path: website/src/documentation/io/developing-io-java.md
 ##########
 @@ -0,0 +1,419 @@
+---
+layout: section
+title: "Apache Beam: Developing I/O connectors for Java"
+section_menu: section-menu/documentation.html
+permalink: /documentation/io/developing-io-java/
+redirect_from: /documentation/io/authoring-java/
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Developing I/O connectors for Java
+
+To connect to a data store that isn’t supported by Beam’s existing I/O
+connectors, you must create a custom I/O connector that usually consist of a
+source and a sink. All Beam sources and sinks are composite transforms; 
however,
+the implementation of your custom I/O depends on your use case.  See the [new
+I/O connector overview]({{ site.baseurl 
}}/documentation/io/developing-io-overview/)
+for a general overview of developing a new I/O connector.
+
+This page describes implementation details for developing sources and sinks
+using Java. The Python SDK offers the same functionality, but uses a slightly
+different API. See [Developing I/O connectors for Python]({{ site.baseurl 
}}/documentation/io/developing-io-python/)
+for information specific to the Python SDK.
+
+## Implementation options
+
+**Sources**
+
+For bounded (batch) sources, there are currently two options for creating a 
Beam
+source:
+
+1. Use `ParDo` and `GroupByKey`.
+2. Use the `Source` interface and extend the `BoundedSource` abstract subclass.
+
+`ParDo` is the recommended option, as implementing a `Source` can be tricky.
+The [developing I/O connectors overview]({{ site.baseurl 
}}/documentation/io/developing-io-overview/)
+covers using `ParDo`, and lists some use cases where you might want to use a
+Source (such as [dynamic work rebalancing]({{ site.baseurl 
}}/blog/2016/05/18/splitAtFraction-method.html)).
+
+For unbounded (streaming) sources, you must use the `Source` interface and 
extend
+the `UnboundedSource` abstract subclass. `UnboundedSource` supports features 
that
+are useful for streaming pipelines such as checkpointing.
+
+Splittable DoFn is a new sources framework that is under development and will
+replace the other options for developing bounded and unbounded sources. For 
more
+information, see the
+[roadmap for multi-SDK connector efforts]({{ site.baseurl 
}}/roadmap/connectors-multi-sdk/).
+
+**Sinks**
+
+To create a Beam sink, we recommend that you use a single `ParDo` that writes 
the
+received records to the data store. However, for file-based sinks, you can use
+the `FileBasedSink` interface.
+
+## Basic code requirements {#basic-code-reqs}
+
+Beam runners use the classes you provide to read and/or write data using
+multiple worker instances in parallel. As such, the code you provide for
+`Source` and `FileBasedSink` subclasses must meet some basic requirements:
+
+  1. **Serializability:** Your `Source` or `FileBasedSink` subclass, whether
+     bounded or unbounded, must be Serializable. A runner might create multiple
+     instances of your `Source` or `FileBasedSink` subclass to be sent to
+     multiple remote workers to facilitate reading or writing in parallel.  
+
+  1. **Immutability:**
+     Your `Source` or `FileBasedSink` subclass must be effectively immutable.
+     All private fields must be declared final, and all private variables of
+     collection type must be effectively immutable. If your class has setter
+     methods, those methods must return an independent copy of the object with
+     the relevant field modified.  
+
+     You should only use mutable state in your `Source` or `FileBasedSink`
+     subclass if you are using lazy evaluation of expensive computations that
+     you need to implement the source or sink; in that case, you must declare
+     all mutable instance variables transient.  
+
+  1. **Thread-Safety:** Your code must be thread-safe. If you build your source
+     to work with dynamic work rebalancing, it is critical that you make your
+     code thread-safe. The Beam SDK provides a helper class to make this 
easier.
+     See [Using Your BoundedSource with dynamic work 
rebalancing](#bounded-dynamic)
+     for more details.  
+
+  1. **Testability:** It is critical to exhaustively unit test all of your
+     `Source` and `FileBasedSink` subclasses, especially if you build your
+     classes to work with advanced features such as dynamic work rebalancing. A
+     minor implementation error can lead to data corruption or data loss (such
+     as skipping or duplicating records) that can be hard to detect.  
+
+     To assist in testing `BoundedSource` implementations, you can use the
+     SourceTestUtils class. `SourceTestUtils` contains utilities for 
automatically
+     verifying some of the properties of your `BoundedSource` implementation. 
You
+     can use `SourceTestUtils` to increase your implementation's test coverage
+     using a wide range of inputs with relatively few lines of code. For
+     examples that use `SourceTestUtils`, see the
+     
[AvroSourceTest](https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java)
 and
+     
[TextIOReadTest](https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java)
+     source code.
+
+In addition, see the [PTransform style guide]({{ site.baseurl 
}}/contribute/ptransform-style-guide/)
+for Beam's transform style guidance.
+
+## Implementing the Source interface
+
+To create a data source for your pipeline, you must provide the format-specific
+logic that tells a runner how to read data from your input source, and how to
+split your data source into multiple parts so that multiple worker instances 
can
+read your data in parallel. If you're creating a data source that reads
+unbounded data, you must provide additional logic for managing your source's
+watermark and optional checkpointing.
+
+Supply the logic for your source by creating the following classes:
+
+  * A subclass of `BoundedSource` if you want to read a finite (batch) data 
set,
+    or a subclass of `UnboundedSource` if you want to read an infinite 
(streaming)
+    data set. These subclasses describe the data you want to read, including 
the
+    data's location and parameters (such as how much data to read).  
+
+  * A subclass of `Source.Reader`. Each Source must have an associated Reader 
that
+    captures all the state involved in reading from that `Source`. This can
+    include things like file handles, RPC connections, and other parameters 
that
+    depend on the specific requirements of the data format you want to read.  
+
+  * The `Reader` class hierarchy mirrors the Source hierarchy. If you're 
extending
+    `BoundedSource`, you'll need to provide an associated `BoundedReader`. if 
you're
+    extending `UnboundedSource`, you'll need to provide an associated
+    `UnboundedReader`.
+
+  * One or more user-facing wrapper composite transforms (`PTransform`) that
+    wrap read operations.
+
+
+### Implementing the Source subclass
+
+You must create a subclass of either `BoundedSource` or `UnboundedSource`,
+depending on whether your data is a finite batch or an infinite stream. In
+either case, your `Source` subclass must override the abstract methods in the
+superclass. A runner might call these methods when using your data source. For
+example, when reading from a bounded source, a runner uses these methods to
+estimate the size of your data set and to split it up for parallel reading.
+
+Your `Source` subclass should also manage basic information about your data
+source, such as the location. For example, the example `Source` implementation
+in Beam’s 
[DatastoreIO](https://beam.apache.org/releases/javadoc/current/index.html?org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.html)
+class takes host, datasetID, and query as arguments. The connector uses these
+values to obtain data from Cloud Datastore.
+
+#### BoundedSource
+
+`BoundedSource` represents a finite data set from which a Beam runner may read,
+possibly in parallel. `BoundedSource` contains a set of abstract methods that
+the runner uses to split the data set for reading by multiple workers.
+
+To implement a `BoundedSource`, your subclass must override the following
+abstract methods:
+
+  * `splitIntoBundles`: The runner uses this method to split your finite data
+    into bundles of a given size.  
+
+  * `getEstimatedSizeBytes`: The runner uses this method to estimate the total
+    size of your data, in bytes.  
+
+  * `producesSortedKeys`: A method to tell the runner whether your source
+    produces key/value pairs in sorted order. If your source doesn't produce
+    key/value pairs, your implementation of this method must return false.  
+
+  * `createReader`: Creates the associated `BoundedReader` for this
+    `BoundedSource`.
+
+You can see a model of how to implement `BoundedSource` and the required
+abstract methods in Beam’s implementations for Cloud BigTable
+([BigtableIO.java](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java))
+and BigQuery 
([BigQuerySourceBase.java](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java)).
+
+#### UnboundedSource
+
+`UnboundedSource` represents an infinite data stream from which the runner may
+read, possibly in parallel. `UnboundedSource` contains a set of abstract 
methods
+that the runner uses to support streaming reads in parallel; these include
+*checkpointing* for failure recovery, *record IDs* to prevent data duplication,
+and *watermarking* for estimating data completeness in downstream parts of your
+pipeline.
+
+To implement an `UnboundedSource`, your subclass must override the following
+abstract methods:
+
+  * `generateInitialSplits`: The runner uses this method to generate a list of
+    `UnboundedSource` objects which represent the number of sub-stream 
instances
+    from which the service should read in parallel.
+
+  * `getCheckpointMarkCoder`: The runner uses this method to obtain the Coder 
for
+    the checkpoints for your source (if any).
+
+  * `requiresDeduping`: The runner uses this method to determine whether the 
data
+    requires explicit removal of duplicate records. If this method returns 
true,
+    the runner will automatically insert a step to remove duplicates from your
+    source's output.  This should return true if and only if your source
+    provides record IDs for each record. See 
`UnboundedReader.getCurrentRecordId`
+    for when this should be done.
+
+  * `createReader`: Creates the associated `UnboundedReader` for this
+    `UnboundedSource`.
+
+### Implementing the Reader subclass
+
+You must create a subclass of either `BoundedReader` or `UnboundedReader` to be
+returned by your source subclass's `createReader` method. The runner uses the
+methods in your `Reader` (whether bounded or unbounded) to do the actual 
reading
+of your dataset.
+
+`BoundedReader` and `UnboundedReader` have similar basic interfaces, which
+you'll need to define. In addition, there are some additional methods unique to
+`UnboundedReader` that you'll need to implement for working with unbounded 
data,
+and an optional method you can implement if you want your `BoundedReader` to
+take advantage of dynamic work rebalancing. There are also minor differences in
+the semantics for the `start()` and `advance()` methods when using
+`UnboundedReader`.
+
+#### Reader methods common to both BoundedReader and UnboundedReader
+
+A runner uses the following methods to read data using `BoundedReader` or
+`UnboundedReader`:
+
+  * `start`: Initializes the `Reader` and advances to the first record to be 
read.
+    This method is called exactly once when the runner begins reading your 
data,
+    and is a good place to put expensive operations needed for initialization. 
 
+
+  * `advance`: Advances the reader to the next valid record. This method must
+    return false if there is no more input available. `BoundedReader` should 
stop
+    reading once advance returns false, but `UnboundedReader` can return true 
in
+    future calls once more data is available from your stream.  
+
+  * `getCurrent`: Returns the data record at the current position, last read by
+    start or advance.  
+
+  * `getCurrentTimestamp`: Returns the timestamp for the current data record. 
You
+    only need to override `getCurrentTimestamp` if your source reads data that 
has
+    intrinsic timestamps. The runner uses this value to set the intrinsic
+    timestamp for each element in the resulting output `PCollection`.
+
+#### Reader methods unique to UnboundedReader
+
+In addition to the basic `Reader` interface, `UnboundedReader` has some
+additional methods for managing reads from an unbounded data source:
+
+  * `getCurrentRecordId`: Returns a unique identifier for the current record.
+    The runner uses these record IDs to filter out duplicate records. If your
+    data has logical IDs present in each record, you can have this method 
return
+    them; otherwise, you can return a hash of the record contents, using at
+    least a 128-bit hash. It is incorrect to use Java's `Object.hashCode()`, as
+    a 32-bit hash is generally insufficient for preventing collisions, and
+    `hasCode()` is not guaranteed to be stable across processes.  
+
+    Implementing `getCurrentRecordId` is optional if your source uses a
+    checkpointing scheme that uniquely identifies each record. For example, if
+    your splits are files and the checkpoints are file positions up to which 
all
+    data has been read, you do not need record IDs. However, record IDs can
+    still be useful if upstream systems writing data to your source 
occasionally
+    produce duplicate records that your source might then read.  
+
+  * `getWatermark`: Returns a watermark that your `Reader` provides. The 
watermark
+    is the approximate lower bound on timestamps of future elements to be read
+    by your `Reader`. The runner uses the watermark as an estimate of data
+    completeness. Watermarks are used in windowing and triggers.  
+
+  * `getCheckpointMark`: The runner uses this method to create a checkpoint in
+    your data stream. The checkpoint represents the progress of the
+    `UnboundedReader`, which can be used for failure recovery. Different data
+    streams may use different checkpointing methods; some sources might require
+    received records to be acknowledged, while others might use positional
+    checkpointing. You'll need to tailor this method to the most appropriate
+    checkpointing scheme. For example, you might have this method return the
+    most recently acked record(s).  
+
+  * `getCheckpointMark` is optional; you don't need to implement it if your 
data
+    does not have meaningful checkpoints. However, if you choose not to
+    implement checkpointing in your source, you may encounter duplicate data or
+    data loss in your pipeline, depending on whether your data source tries to
+    re-send records in case of errors.  
+
+You can read a bounded `PCollection` from an `UnboundedSource` by specifying
+either `.withMaxNumRecords` or `.withMaxReadTime` when you read from your
+source.  `.withMaxNumRecords` reads a fixed maximum number of records from your
+unbounded source, while `.withMaxReadTime` reads from your unbounded source for
+a fixed maximum time duration.
+
+#### Using your BoundedSource with dynamic work rebalancing {#bounded-dynamic}
+
+If your source provides bounded data, you can have your `BoundedReader` work
+with dynamic work rebalancing by implementing the method `splitAtFraction`. The
+runner may call `splitAtFraction` concurrently with start or advance on a given
+reader so that the remaining data in your `Source` can be split and
+redistributed to other workers.
+
+When you implement `splitAtFraction`, your code must produce a
+mutually-exclusive set of splits where the union of those splits matches the
+total data set.
+
+If you implement `splitAtFraction`, you must implement both `splitAtFraction`
+and `getFractionConsumed` in a thread-safe manner, or data loss is possible. 
You
+should also unit-test your implementation exhaustively to avoid data 
duplication
+or data loss.
+
+To ensure that your code is thread-safe, use the `RangeTracker` thread-safe
+helper object to manage positions in your data source when implementing
+`splitAtFraction` and `getFractionConsumed`.
+
+We highly recommended that you unit test your implementations of
+`splitAtFraction` using the `SourceTestUtils` class. `SourceTestUtils` contains
+a number of methods for testing your implementation of `splitAtFraction`,
+including exhaustive automatic testing.
+
+### Implementing wrapper transforms
+
+When you create a source that end-users will use, we recommend that
+you do not expose the code for the source itself. Instead, create a wrapping
+`PTransform`. See [PTransform wrappers](#ptransform-wrappers) to see how
+and why to avoid exposing your sources.
+
+### Convenience Source and Reader base classes
+
+The Beam SDK contains some convenient abstract base classes to help you create
+`Source` and `Reader` classes that work with common data storage formats, like
+files.
+
+#### FileBasedSource
+
+If your data source uses files, you can derive your `Source` and `Reader`
+classes from the `FileBasedSource` and `FileBasedReader` abstract base classes.
+`FileBasedSource` is a bounded source subclass that implements code common to
+Beam sources that interact with files, including:
+
+  * File pattern expansion
+  * Sequential record reading
+  * Split points
+
+
+## Implementing the FileBasedSink interface
+
+If your data source uses files, you can use the `FileBasedSink` interface to
+create a file-based sink for your pipeline. If your source is not file-based,
+implement a composite `PTransform` that uses `ParDo`  as discussed in the
+[developing I/O connectors overview]({{ site.baseurl 
}}/documentation/io/developing-io-overview/)
+
+When using the `FileBasedSink` interface, you must provide the format-specific
+logic that tells the runner how to write bounded data from your pipeline's
+`PCollection`s to an output sink. The runner writes bundles of data in parallel
+using multiple workers.
+
+Supply the logic for your file-based sink by implementing the following 
classes:
+
+  * A subclass of the abstract base class `FileBasedSink`. `FileBasedSink`
+    describes a location or resource that your pipeline can write to in
+    parallel.
+
+  * A user-facing wrapper `PTransform` that, as part of the logic, calls
+    
[WriteFiles](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java)
+    and passes your `FileBasedSink` as a parameter.
+
+### Implementing the FileBasedSink subclass
+
+You must create a subclass of the abstract base class `FileBasedSink`. To avoid
+exposing your sink to end-users, your class should be protected or private.
+
+The `FileBasedSink` abstract base class implements code common to Beam sources
+that interact with files, including:
+
+  * Setting file headers and footers
+  * Sequential record writing
+  * Setting the output MIME type
+
+`FileBasedSink` describes a location or resource that your pipeline can write 
to
 
 Review comment:
   Probably remove this sentence. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 180963)
    Time Spent: 1h 50m  (was: 1h 40m)

> Add page for developing I/O connectors for Java
> -----------------------------------------------
>
>                 Key: BEAM-6347
>                 URL: https://issues.apache.org/jira/browse/BEAM-6347
>             Project: Beam
>          Issue Type: Bug
>          Components: website
>            Reporter: Melissa Pashniak
>            Assignee: Melissa Pashniak
>            Priority: Minor
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to