http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java deleted file mode 100644 index eae5e8b..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java +++ /dev/null @@ -1,544 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.JAXBCoder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.common.base.Preconditions; - -import org.codehaus.stax2.XMLInputFactory2; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.SequenceInputStream; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.charset.StandardCharsets; -import java.util.NoSuchElementException; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBElement; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.ValidationEvent; -import javax.xml.bind.ValidationEventHandler; -import javax.xml.stream.FactoryConfigurationError; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamConstants; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamReader; - -// CHECKSTYLE.OFF: JavadocStyle -/** - * A source that can be used to read XML files. This source reads one or more - * XML files and creates a {@code PCollection} of a given type. An Dataflow read transform can be - * created by passing an {@code XmlSource} object to {@code Read.from()}. Please note the - * example given below. - * - * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML - * element names that are defined by the user: - * - * <pre> - * {@code - * <root> - * <record> ... </record> - * <record> ... </record> - * <record> ... </record> - * ... - * <record> ... </record> - * </root> - * } - * </pre> - * - * <p>Basically, the XML document should contain a single root element with an inner list consisting - * entirely of record elements. The records may contain arbitrary XML content; however, that content - * <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags. This - * restriction enables reading from large XML files in parallel from different offsets in the file. - * - * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes. - * Additionally users must provide a class of a JAXB annotated Java type that can be used convert - * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms. Reading - * the source will generate a {@code PCollection} of the given JAXB annotated Java type. - * Optionally users may provide a minimum size of a bundle that should be created for the source. - * - * <p>The following example shows how to read from {@link XmlSource} in a Dataflow pipeline: - * - * <pre> - * {@code - * XmlSource<String> source = XmlSource.<String>from(file.toPath().toString()) - * .withRootElement("root") - * .withRecordElement("record") - * .withRecordClass(Record.class); - * PCollection<String> output = p.apply(Read.from(source)); - * } - * </pre> - * - * <p>Currently, only XML files that use single-byte characters are supported. Using a file that - * contains multi-byte characters may result in data loss or duplication. - * - * <p>To use {@link XmlSource}: - * <ol> - * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api</li> - * <li>Include a compatible implementation on the classpath at run-time, - * such as org.codehaus.woodstox:woodstox-core-asl</li> - * </ol> - * - * <p>These dependencies have been declared as optional in Maven sdk/pom.xml file of - * Google Cloud Dataflow. - * - * <p><h3>Permissions</h3> - * Permission requirements depend on the - * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner PipelineRunner} that is - * used to execute the Dataflow job. Please refer to the documentation of corresponding - * {@link PipelineRunner PipelineRunners} for more details. - * - * @param <T> Type of the objects that represent the records of the XML file. The - * {@code PCollection} generated by this source will be of this type. - */ -// CHECKSTYLE.ON: JavadocStyle -public class XmlSource<T> extends FileBasedSource<T> { - - private static final String XML_VERSION = "1.1"; - private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024; - private final String rootElement; - private final String recordElement; - private final Class<T> recordClass; - - /** - * Creates an XmlSource for a single XML file or a set of XML files defined by a Java "glob" file - * pattern. Each XML file should be of the form defined in {@link XmlSource}. - */ - public static <T> XmlSource<T> from(String fileOrPatternSpec) { - return new XmlSource<>(fileOrPatternSpec, DEFAULT_MIN_BUNDLE_SIZE, null, null, null); - } - - /** - * Sets name of the root element of the XML document. This will be used to create a valid starting - * root element when initiating a bundle of records created from an XML document. This is a - * required parameter. - */ - public XmlSource<T> withRootElement(String rootElement) { - return new XmlSource<>( - getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass); - } - - /** - * Sets name of the record element of the XML document. This will be used to determine offset of - * the first record of a bundle created from the XML document. This is a required parameter. - */ - public XmlSource<T> withRecordElement(String recordElement) { - return new XmlSource<>( - getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass); - } - - /** - * Sets a JAXB annotated class that can be populated using a record of the provided XML file. This - * will be used when unmarshalling record objects from the XML file. This is a required - * parameter. - */ - public XmlSource<T> withRecordClass(Class<T> recordClass) { - return new XmlSource<>( - getFileOrPatternSpec(), getMinBundleSize(), rootElement, recordElement, recordClass); - } - - /** - * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please refer - * to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional - * parameter. - */ - public XmlSource<T> withMinBundleSize(long minBundleSize) { - return new XmlSource<>( - getFileOrPatternSpec(), minBundleSize, rootElement, recordElement, recordClass); - } - - private XmlSource(String fileOrPattern, long minBundleSize, String rootElement, - String recordElement, Class<T> recordClass) { - super(fileOrPattern, minBundleSize); - this.rootElement = rootElement; - this.recordElement = recordElement; - this.recordClass = recordClass; - } - - private XmlSource(String fileOrPattern, long minBundleSize, long startOffset, long endOffset, - String rootElement, String recordElement, Class<T> recordClass) { - super(fileOrPattern, minBundleSize, startOffset, endOffset); - this.rootElement = rootElement; - this.recordElement = recordElement; - this.recordClass = recordClass; - } - - @Override - protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) { - return new XmlSource<T>( - fileName, getMinBundleSize(), start, end, rootElement, recordElement, recordClass); - } - - @Override - protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) { - return new XMLReader<T>(this); - } - - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return false; - } - - @Override - public void validate() { - super.validate(); - Preconditions.checkNotNull( - rootElement, "rootElement is null. Use builder method withRootElement() to set this."); - Preconditions.checkNotNull( - recordElement, - "recordElement is null. Use builder method withRecordElement() to set this."); - Preconditions.checkNotNull( - recordClass, "recordClass is null. Use builder method withRecordClass() to set this."); - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return JAXBCoder.of(recordClass); - } - - public String getRootElement() { - return rootElement; - } - - public String getRecordElement() { - return recordElement; - } - - public Class<T> getRecordClass() { - return recordClass; - } - - /** - * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML - * file should be of the form defined at {@link XmlSource}. - * - * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp - * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}. - * - * @param <T> Type of objects that will be read by the reader. - */ - private static class XMLReader<T> extends FileBasedReader<T> { - // The amount of bytes read from the channel to memory when determining the starting offset of - // the first record in a bundle. After matching to starting offset of the first record the - // remaining bytes read to this buffer and the bytes still not read from the channel are used to - // create the XML parser. - private static final int BUF_SIZE = 1024; - - // This should be the maximum number of bytes a character will encode to, for any encoding - // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be - // four bytes. - private static final int MAX_CHAR_BYTES = 4; - - // In order to support reading starting in the middle of an XML file, we construct an imaginary - // well-formed document (a header and root tag followed by the contents of the input starting at - // the record boundary) and feed it to the parser. Because of this, the offset reported by the - // XML parser is not the same as offset in the original file. They differ by a constant amount: - // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset; - // Note that this is true only for files with single-byte characters. - // It appears that, as of writing, there does not exist a Java XML parser capable of correctly - // reporting byte offsets of elements in the presence of multi-byte characters. - private long parserBaseOffset = 0; - private boolean readingStarted = false; - - // If true, the current bundle does not contain any records. - private boolean emptyBundle = false; - - private Unmarshaller jaxbUnmarshaller = null; - private XMLStreamReader parser = null; - - private T currentRecord = null; - - // Byte offset of the current record in the XML file provided when creating the source. - private long currentByteOffset = 0; - - public XMLReader(XmlSource<T> source) { - super(source); - - // Set up a JAXB Unmarshaller that can be used to unmarshall record objects. - try { - JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().recordClass); - jaxbUnmarshaller = jaxbContext.createUnmarshaller(); - - // Throw errors if validation fails. JAXB by default ignores validation errors. - jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() { - @Override - public boolean handleEvent(ValidationEvent event) { - throw new RuntimeException(event.getMessage(), event.getLinkedException()); - } - }); - } catch (JAXBException e) { - throw new RuntimeException(e); - } - } - - @Override - public synchronized XmlSource<T> getCurrentSource() { - return (XmlSource<T>) super.getCurrentSource(); - } - - @Override - protected void startReading(ReadableByteChannel channel) throws IOException { - // This method determines the correct starting offset of the first record by reading bytes - // from the ReadableByteChannel. This implementation does not need the channel to be a - // SeekableByteChannel. - // The method tries to determine the first record element in the byte channel. The first - // record must start with the characters "<recordElement" where "recordElement" is the - // record element of the XML document described above. For the match to be complete this - // has to be followed by one of following. - // * any whitespace character - // * '>' character - // * '/' character (to support empty records). - // - // After this match this method creates the XML parser for parsing the XML document, - // feeding it a fake document consisting of an XML header and the <rootElement> tag followed - // by the contents of channel starting from <recordElement. The <rootElement> tag may be never - // closed. - - // This stores any bytes that should be used prior to the remaining bytes of the channel when - // creating an XML parser object. - ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream(); - // A dummy declaration and root for the document with proper XML version and encoding. Without - // this XML parsing may fail or may produce incorrect results. - - byte[] dummyStartDocumentBytes = - ("<?xml version=\"" + XML_VERSION + "\" encoding=\"UTF-8\" ?>" - + "<" + getCurrentSource().rootElement + ">").getBytes(StandardCharsets.UTF_8); - preambleByteBuffer.write(dummyStartDocumentBytes); - // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This - // method returns the offset and stores any bytes that should be used when creating the XML - // parser in preambleByteBuffer. - long offsetInFileOfRecordElement = - getFirstOccurenceOfRecordElement(channel, preambleByteBuffer); - if (offsetInFileOfRecordElement < 0) { - // Bundle has no records. So marking this bundle as an empty bundle. - emptyBundle = true; - return; - } else { - byte[] preambleBytes = preambleByteBuffer.toByteArray(); - currentByteOffset = offsetInFileOfRecordElement; - setUpXMLParser(channel, preambleBytes); - parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length; - } - readingStarted = true; - } - - // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts - // any bytes read past the starting offset of the next record back to the preambleByteBuffer. - // If a record is found, returns the starting offset of the record, otherwise - // returns -1. - private long getFirstOccurenceOfRecordElement( - ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException { - int byteIndexInRecordElementToMatch = 0; - // Index of the byte in the string "<recordElement" to be matched - // against the current byte from the stream. - boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the - // next character to confirm if this is a positive match. - boolean fullyMatched = false; // If true, record element was fully matched. - - // This gives the offset of the byte currently being read. We do a '-1' here since we - // increment this value at the beginning of the while loop below. - long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1; - long startingOffsetInFileOfCurrentMatch = -1; - // If this is non-negative, currently there is a match in progress and this value gives the - // starting offset of the match currently being conducted. - boolean matchStarted = false; // If true, a match is currently in progress. - - // These two values are used to determine the character immediately following a match for - // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above. - byte[] charBytes = new byte[MAX_CHAR_BYTES]; - int charBytesFound = 0; - - ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); - byte[] recordStartBytes = - ("<" + getCurrentSource().recordElement).getBytes(StandardCharsets.UTF_8); - - outer: while (channel.read(buf) > 0) { - buf.flip(); - while (buf.hasRemaining()) { - offsetInFileOfCurrentByte++; - byte b = buf.get(); - boolean reset = false; - if (recordStartBytesMatched) { - // We already matched "<recordElement" reading the next character to determine if this - // is a positive match for a new record. - charBytes[charBytesFound] = b; - charBytesFound++; - Character c = null; - if (charBytesFound == charBytes.length) { - CharBuffer charBuf = CharBuffer.allocate(1); - InputStream charBufStream = new ByteArrayInputStream(charBytes); - java.io.Reader reader = - new InputStreamReader(charBufStream, StandardCharsets.UTF_8); - int read = reader.read(); - if (read <= 0) { - return -1; - } - charBuf.flip(); - c = (char) read; - } else { - continue; - } - - // Record start may be of following forms - // * "<recordElement<whitespace>..." - // * "<recordElement>..." - // * "<recordElement/..." - if (Character.isWhitespace(c) || c == '>' || c == '/') { - fullyMatched = true; - // Add the recordStartBytes and charBytes to preambleByteBuffer since these were - // already read from the channel. - preambleByteBuffer.write(recordStartBytes); - preambleByteBuffer.write(charBytes); - // Also add the rest of the current buffer to preambleByteBuffer. - while (buf.hasRemaining()) { - preambleByteBuffer.write(buf.get()); - } - break outer; - } else { - // Matching was unsuccessful. Reset the buffer to include bytes read for the char. - ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE); - newbuf.put(charBytes); - offsetInFileOfCurrentByte -= charBytes.length; - while (buf.hasRemaining()) { - newbuf.put(buf.get()); - } - newbuf.flip(); - buf = newbuf; - - // Ignore everything and try again starting from the current buffer. - reset = true; - } - } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) { - // Next byte matched. - if (!matchStarted) { - // Match was for the first byte, record the starting offset. - matchStarted = true; - startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte; - } - byteIndexInRecordElementToMatch++; - } else { - // Not a match. Ignore everything and try again starting at current point. - reset = true; - } - if (reset) { - // Clear variables and try to match starting from the next byte. - byteIndexInRecordElementToMatch = 0; - startingOffsetInFileOfCurrentMatch = -1; - matchStarted = false; - recordStartBytesMatched = false; - charBytes = new byte[MAX_CHAR_BYTES]; - charBytesFound = 0; - } - if (byteIndexInRecordElementToMatch == recordStartBytes.length) { - // "<recordElement" matched. Need to still check next byte since this might be an - // element that has "recordElement" as a prefix. - recordStartBytesMatched = true; - } - } - buf.clear(); - } - - if (!fullyMatched) { - return -1; - } else { - return startingOffsetInFileOfCurrentMatch; - } - } - - private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException { - try { - // We use Woodstox because the StAX implementation provided by OpenJDK reports - // character locations incorrectly. Note that Woodstox still currently reports *byte* - // locations incorrectly when parsing documents that contain multi-byte characters. - XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance(); - this.parser = xmlInputFactory.createXMLStreamReader( - new SequenceInputStream( - new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)), - "UTF-8"); - - // Current offset should be the offset before reading the record element. - while (true) { - int event = parser.next(); - if (event == XMLStreamConstants.START_ELEMENT) { - String localName = parser.getLocalName(); - if (localName.equals(getCurrentSource().recordElement)) { - break; - } - } - } - } catch (FactoryConfigurationError | XMLStreamException e) { - throw new IOException(e); - } - } - - @Override - protected boolean readNextRecord() throws IOException { - if (emptyBundle) { - currentByteOffset = Long.MAX_VALUE; - return false; - } - try { - // Update current offset and check if the next value is the record element. - currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset(); - while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) { - parser.next(); - currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset(); - if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) { - currentByteOffset = Long.MAX_VALUE; - return false; - } - } - JAXBElement<T> jb = jaxbUnmarshaller.unmarshal(parser, getCurrentSource().recordClass); - currentRecord = jb.getValue(); - return true; - } catch (JAXBException | XMLStreamException e) { - throw new IOException(e); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (!readingStarted) { - throw new NoSuchElementException(); - } - return currentRecord; - } - - @Override - protected boolean isAtSplitPoint() { - // Every record is at a split point. - return true; - } - - @Override - protected long getCurrentOffset() { - return currentByteOffset; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java deleted file mode 100644 index 0ce2a5e..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java +++ /dev/null @@ -1,989 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.bigtable; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.annotations.Experimental; - -import com.google.bigtable.v1.Mutation; -import com.google.bigtable.v1.Row; -import com.google.bigtable.v1.RowFilter; -import com.google.bigtable.v1.SampleRowKeysResponse; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.Proto2Coder; -import com.google.cloud.dataflow.sdk.coders.VarLongCoder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader; -import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation; -import com.google.cloud.dataflow.sdk.io.Sink.Writer; -import com.google.cloud.dataflow.sdk.io.range.ByteKey; -import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange; -import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PDone; -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.protobuf.ByteString; -import com.google.protobuf.Empty; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentLinkedQueue; - -import javax.annotation.Nullable; - -/** - * A bounded source and sink for Google Cloud Bigtable. - * - * <p>For more information, see the online documentation at - * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>. - * - * <h3>Reading from Cloud Bigtable</h3> - * - * <p>The Bigtable source returns a set of rows from a single table, returning a - * {@code PCollection<Row>}. - * - * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions} - * or builder configured with the project and other information necessary to identify the - * Bigtable cluster. A {@link RowFilter} may also optionally be specified using - * {@link BigtableIO.Read#withRowFilter}. For example: - * - * <pre>{@code - * BigtableOptions.Builder optionsBuilder = - * new BigtableOptions.Builder() - * .setProjectId("project") - * .setClusterId("cluster") - * .setZoneId("zone"); - * - * Pipeline p = ...; - * - * // Scan the entire table. - * p.apply("read", - * BigtableIO.read() - * .withBigtableOptions(optionsBuilder) - * .withTableId("table")); - * - * // Scan a subset of rows that match the specified row filter. - * p.apply("filtered read", - * BigtableIO.read() - * .withBigtableOptions(optionsBuilder) - * .withTableId("table") - * .withRowFilter(filter)); - * }</pre> - * - * <h3>Writing to Cloud Bigtable</h3> - * - * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a - * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the - * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an - * idempotent transformation to that row. - * - * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions} - * or builder configured with the project and other information necessary to identify the - * Bigtable cluster, for example: - * - * <pre>{@code - * BigtableOptions.Builder optionsBuilder = - * new BigtableOptions.Builder() - * .setProjectId("project") - * .setClusterId("cluster") - * .setZoneId("zone"); - * - * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...; - * - * data.apply("write", - * BigtableIO.write() - * .withBigtableOptions(optionsBuilder) - * .withTableId("table")); - * }</pre> - * - * <h3>Experimental</h3> - * - * <p>This connector for Cloud Bigtable is considered experimental and may break or receive - * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is - * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs. - * - * <h3>Permissions</h3> - * - * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the - * Dataflow job. Please refer to the documentation of corresponding - * {@link PipelineRunner PipelineRunners} for more details. - */ -@Experimental -public class BigtableIO { - private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class); - - /** - * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be - * initialized with a - * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies - * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that - * specifies which table to read. A {@link RowFilter} may also optionally be specified using - * {@link BigtableIO.Read#withRowFilter}. - */ - @Experimental - public static Read read() { - return new Read(null, "", null, null); - } - - /** - * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be - * initialized with a - * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies - * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that - * specifies which table to write. - */ - @Experimental - public static Write write() { - return new Write(null, "", null); - } - - /** - * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on - * {@link BigtableIO} for more information. - * - * @see BigtableIO - */ - @Experimental - public static class Read extends PTransform<PBegin, PCollection<Row>> { - /** - * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster - * indicated by the given options, and using any other specified customizations. - * - * <p>Does not modify this object. - */ - public Read withBigtableOptions(BigtableOptions options) { - checkNotNull(options, "options"); - return withBigtableOptions(options.toBuilder()); - } - - /** - * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster - * indicated by the given options, and using any other specified customizations. - * - * <p>Clones the given {@link BigtableOptions} builder so that any further changes - * will have no effect on the returned {@link BigtableIO.Read}. - * - * <p>Does not modify this object. - */ - public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { - checkNotNull(optionsBuilder, "optionsBuilder"); - // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. - BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder(); - BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); - return new Read(optionsWithAgent, tableId, filter, bigtableService); - } - - /** - * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable - * using the given row filter. - * - * <p>Does not modify this object. - */ - public Read withRowFilter(RowFilter filter) { - checkNotNull(filter, "filter"); - return new Read(options, tableId, filter, bigtableService); - } - - /** - * Returns a new {@link BigtableIO.Read} that will read from the specified table. - * - * <p>Does not modify this object. - */ - public Read withTableId(String tableId) { - checkNotNull(tableId, "tableId"); - return new Read(options, tableId, filter, bigtableService); - } - - /** - * Returns the Google Cloud Bigtable cluster being read from, and other parameters. - */ - public BigtableOptions getBigtableOptions() { - return options; - } - - /** - * Returns the table being read from. - */ - public String getTableId() { - return tableId; - } - - @Override - public PCollection<Row> apply(PBegin input) { - BigtableSource source = - new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null); - return input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source)); - } - - @Override - public void validate(PBegin input) { - checkArgument(options != null, "BigtableOptions not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try { - checkArgument( - getBigtableService().tableExists(tableId), "Table %s does not exist", tableId); - } catch (IOException e) { - logger.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Read.class) - .add("options", options) - .add("tableId", tableId) - .add("filter", filter) - .toString(); - } - - ///////////////////////////////////////////////////////////////////////////////////////// - /** - * Used to define the Cloud Bigtable cluster and any options for the networking layer. - * Cannot actually be {@code null} at validation time, but may start out {@code null} while - * source is being built. - */ - @Nullable private final BigtableOptions options; - private final String tableId; - @Nullable private final RowFilter filter; - @Nullable private final BigtableService bigtableService; - - private Read( - @Nullable BigtableOptions options, - String tableId, - @Nullable RowFilter filter, - @Nullable BigtableService bigtableService) { - this.options = options; - this.tableId = checkNotNull(tableId, "tableId"); - this.filter = filter; - this.bigtableService = bigtableService; - } - - /** - * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable - * service implementation. - * - * <p>This is used for testing. - * - * <p>Does not modify this object. - */ - Read withBigtableService(BigtableService bigtableService) { - checkNotNull(bigtableService, "bigtableService"); - return new Read(options, tableId, filter, bigtableService); - } - - /** - * Helper function that either returns the mock Bigtable service supplied by - * {@link #withBigtableService} or creates and returns an implementation that talks to - * {@code Cloud Bigtable}. - */ - private BigtableService getBigtableService() { - if (bigtableService != null) { - return bigtableService; - } - return new BigtableServiceImpl(options); - } - } - - /** - * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on - * {@link BigtableIO} for more information. - * - * @see BigtableIO - */ - @Experimental - public static class Write - extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> { - /** - * Used to define the Cloud Bigtable cluster and any options for the networking layer. - * Cannot actually be {@code null} at validation time, but may start out {@code null} while - * source is being built. - */ - @Nullable private final BigtableOptions options; - private final String tableId; - @Nullable private final BigtableService bigtableService; - - private Write( - @Nullable BigtableOptions options, - String tableId, - @Nullable BigtableService bigtableService) { - this.options = options; - this.tableId = checkNotNull(tableId, "tableId"); - this.bigtableService = bigtableService; - } - - /** - * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster - * indicated by the given options, and using any other specified customizations. - * - * <p>Does not modify this object. - */ - public Write withBigtableOptions(BigtableOptions options) { - checkNotNull(options, "options"); - return withBigtableOptions(options.toBuilder()); - } - - /** - * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster - * indicated by the given options, and using any other specified customizations. - * - * <p>Clones the given {@link BigtableOptions} builder so that any further changes - * will have no effect on the returned {@link BigtableIO.Write}. - * - * <p>Does not modify this object. - */ - public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { - checkNotNull(optionsBuilder, "optionsBuilder"); - // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. - BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder(); - BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); - return new Write(optionsWithAgent, tableId, bigtableService); - } - - /** - * Returns a new {@link BigtableIO.Write} that will write to the specified table. - * - * <p>Does not modify this object. - */ - public Write withTableId(String tableId) { - checkNotNull(tableId, "tableId"); - return new Write(options, tableId, bigtableService); - } - - /** - * Returns the Google Cloud Bigtable cluster being written to, and other parameters. - */ - public BigtableOptions getBigtableOptions() { - return options; - } - - /** - * Returns the table being written to. - */ - public String getTableId() { - return tableId; - } - - @Override - public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) { - Sink sink = new Sink(tableId, getBigtableService()); - return input.apply(com.google.cloud.dataflow.sdk.io.Write.to(sink)); - } - - @Override - public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) { - checkArgument(options != null, "BigtableOptions not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try { - checkArgument( - getBigtableService().tableExists(tableId), "Table %s does not exist", tableId); - } catch (IOException e) { - logger.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - } - - /** - * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable - * service implementation. - * - * <p>This is used for testing. - * - * <p>Does not modify this object. - */ - Write withBigtableService(BigtableService bigtableService) { - checkNotNull(bigtableService, "bigtableService"); - return new Write(options, tableId, bigtableService); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Write.class) - .add("options", options) - .add("tableId", tableId) - .toString(); - } - - /** - * Helper function that either returns the mock Bigtable service supplied by - * {@link #withBigtableService} or creates and returns an implementation that talks to - * {@code Cloud Bigtable}. - */ - private BigtableService getBigtableService() { - if (bigtableService != null) { - return bigtableService; - } - return new BigtableServiceImpl(options); - } - } - - ////////////////////////////////////////////////////////////////////////////////////////// - /** Disallow construction of utility class. */ - private BigtableIO() {} - - static class BigtableSource extends BoundedSource<Row> { - public BigtableSource( - BigtableService service, - String tableId, - @Nullable RowFilter filter, - ByteKeyRange range, - Long estimatedSizeBytes) { - this.service = service; - this.tableId = tableId; - this.filter = filter; - this.range = range; - this.estimatedSizeBytes = estimatedSizeBytes; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(BigtableSource.class) - .add("tableId", tableId) - .add("filter", filter) - .add("range", range) - .add("estimatedSizeBytes", estimatedSizeBytes) - .toString(); - } - - ////// Private state and internal implementation details ////// - private final BigtableService service; - @Nullable private final String tableId; - @Nullable private final RowFilter filter; - private final ByteKeyRange range; - @Nullable private Long estimatedSizeBytes; - @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys; - - protected BigtableSource withStartKey(ByteKey startKey) { - checkNotNull(startKey, "startKey"); - return new BigtableSource( - service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes); - } - - protected BigtableSource withEndKey(ByteKey endKey) { - checkNotNull(endKey, "endKey"); - return new BigtableSource( - service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes); - } - - protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) { - checkNotNull(estimatedSizeBytes, "estimatedSizeBytes"); - return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes); - } - - /** - * Makes an API call to the Cloud Bigtable service that gives information about tablet key - * boundaries and estimated sizes. We can use these samples to ensure that splits are on - * different tablets, and possibly generate sub-splits within tablets. - */ - private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException { - return service.getSampleRowKeys(this); - } - - @Override - public List<BigtableSource> splitIntoBundles( - long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - // Update the desiredBundleSizeBytes in order to limit the - // number of splits to maximumNumberOfSplits. - long maximumNumberOfSplits = 4000; - long sizeEstimate = getEstimatedSizeBytes(options); - desiredBundleSizeBytes = - Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes); - - // Delegate to testable helper. - return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys()); - } - - /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */ - private List<BigtableSource> splitIntoBundlesBasedOnSamples( - long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) { - // There are no regions, or no samples available. Just scan the entire range. - if (sampleRowKeys.isEmpty()) { - logger.info("Not splitting source {} because no sample row keys are available.", this); - return Collections.singletonList(this); - } - - logger.info( - "About to split into bundles of size {} with sampleRowKeys length {} first element {}", - desiredBundleSizeBytes, - sampleRowKeys.size(), - sampleRowKeys.get(0)); - - // Loop through all sampled responses and generate splits from the ones that overlap the - // scan range. The main complication is that we must track the end range of the previous - // sample to generate good ranges. - ByteKey lastEndKey = ByteKey.EMPTY; - long lastOffset = 0; - ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder(); - for (SampleRowKeysResponse response : sampleRowKeys) { - ByteKey responseEndKey = ByteKey.of(response.getRowKey()); - long responseOffset = response.getOffsetBytes(); - checkState( - responseOffset >= lastOffset, - "Expected response byte offset %s to come after the last offset %s", - responseOffset, - lastOffset); - - if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) { - // This region does not overlap the scan, so skip it. - lastOffset = responseOffset; - lastEndKey = responseEndKey; - continue; - } - - // Calculate the beginning of the split as the larger of startKey and the end of the last - // split. Unspecified start is smallest key so is correctly treated as earliest key. - ByteKey splitStartKey = lastEndKey; - if (splitStartKey.compareTo(range.getStartKey()) < 0) { - splitStartKey = range.getStartKey(); - } - - // Calculate the end of the split as the smaller of endKey and the end of this sample. Note - // that range.containsKey handles the case when range.getEndKey() is empty. - ByteKey splitEndKey = responseEndKey; - if (!range.containsKey(splitEndKey)) { - splitEndKey = range.getEndKey(); - } - - // We know this region overlaps the desired key range, and we know a rough estimate of its - // size. Split the key range into bundle-sized chunks and then add them all as splits. - long sampleSizeBytes = responseOffset - lastOffset; - List<BigtableSource> subSplits = - splitKeyRangeIntoBundleSizedSubranges( - sampleSizeBytes, - desiredBundleSizeBytes, - ByteKeyRange.of(splitStartKey, splitEndKey)); - splits.addAll(subSplits); - - // Move to the next region. - lastEndKey = responseEndKey; - lastOffset = responseOffset; - } - - // We must add one more region after the end of the samples if both these conditions hold: - // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length). - // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey). - if (!lastEndKey.isEmpty() - && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) { - splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey())); - } - - List<BigtableSource> ret = splits.build(); - logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0)); - return ret; - } - - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws IOException { - // Delegate to testable helper. - if (estimatedSizeBytes == null) { - estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys()); - } - return estimatedSizeBytes; - } - - /** - * Computes the estimated size in bytes based on the total size of all samples that overlap - * the key range this source will scan. - */ - private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) { - long estimatedSizeBytes = 0; - long lastOffset = 0; - ByteKey currentStartKey = ByteKey.EMPTY; - // Compute the total estimated size as the size of each sample that overlaps the scan range. - // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a - // filter or to sample on a given key range. - for (SampleRowKeysResponse response : samples) { - ByteKey currentEndKey = ByteKey.of(response.getRowKey()); - long currentOffset = response.getOffsetBytes(); - if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) { - // Skip an empty region. - lastOffset = currentOffset; - continue; - } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) { - estimatedSizeBytes += currentOffset - lastOffset; - } - currentStartKey = currentEndKey; - lastOffset = currentOffset; - } - return estimatedSizeBytes; - } - - /** - * Cloud Bigtable returns query results ordered by key. - */ - @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { - return true; - } - - @Override - public BoundedReader<Row> createReader(PipelineOptions options) throws IOException { - return new BigtableReader(this, service); - } - - @Override - public void validate() { - checkArgument(!tableId.isEmpty(), "tableId cannot be empty"); - } - - @Override - public Coder<Row> getDefaultOutputCoder() { - return Proto2Coder.of(Row.class); - } - - /** Helper that splits the specified range in this source into bundles. */ - private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges( - long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) { - // Catch the trivial cases. Split is small enough already, or this is the last region. - logger.debug( - "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}", - sampleSizeBytes, - desiredBundleSizeBytes); - if (sampleSizeBytes <= desiredBundleSizeBytes) { - return Collections.singletonList( - this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey())); - } - - checkArgument( - sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes); - checkArgument( - desiredBundleSizeBytes > 0, - "Desired bundle size %s bytes must be greater than 0.", - desiredBundleSizeBytes); - - int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes)); - List<ByteKey> splitKeys = range.split(splitCount); - ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder(); - Iterator<ByteKey> keys = splitKeys.iterator(); - ByteKey prev = keys.next(); - while (keys.hasNext()) { - ByteKey next = keys.next(); - splits.add( - this - .withStartKey(prev) - .withEndKey(next) - .withEstimatedSizeBytes(sampleSizeBytes / splitCount)); - prev = next; - } - return splits.build(); - } - - public ByteKeyRange getRange() { - return range; - } - - public RowFilter getRowFilter() { - return filter; - } - - public String getTableId() { - return tableId; - } - } - - private static class BigtableReader extends BoundedReader<Row> { - // Thread-safety: source is protected via synchronization and is only accessed or modified - // inside a synchronized block (or constructor, which is the same). - private BigtableSource source; - private BigtableService service; - private BigtableService.Reader reader; - private final ByteKeyRangeTracker rangeTracker; - private long recordsReturned; - - public BigtableReader(BigtableSource source, BigtableService service) { - this.source = source; - this.service = service; - rangeTracker = ByteKeyRangeTracker.of(source.getRange()); - } - - @Override - public boolean start() throws IOException { - reader = service.createReader(getCurrentSource()); - boolean hasRecord = - reader.start() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); - if (hasRecord) { - ++recordsReturned; - } - return hasRecord; - } - - @Override - public synchronized BigtableSource getCurrentSource() { - return source; - } - - @Override - public boolean advance() throws IOException { - boolean hasRecord = - reader.advance() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); - if (hasRecord) { - ++recordsReturned; - } - return hasRecord; - } - - @Override - public Row getCurrent() throws NoSuchElementException { - return reader.getCurrentRow(); - } - - @Override - public void close() throws IOException { - logger.info("Closing reader after reading {} records.", recordsReturned); - if (reader != null) { - reader.close(); - reader = null; - } - } - - @Override - public final Double getFractionConsumed() { - return rangeTracker.getFractionConsumed(); - } - - @Override - public final synchronized BigtableSource splitAtFraction(double fraction) { - ByteKey splitKey; - try { - splitKey = source.getRange().interpolateKey(fraction); - } catch (IllegalArgumentException e) { - logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction); - return null; - } - logger.debug( - "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey); - if (!rangeTracker.trySplitAtPosition(splitKey)) { - return null; - } - BigtableSource primary = source.withEndKey(splitKey); - BigtableSource residual = source.withStartKey(splitKey); - this.source = primary; - return residual; - } - } - - private static class Sink - extends com.google.cloud.dataflow.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> { - - public Sink(String tableId, BigtableService bigtableService) { - this.tableId = checkNotNull(tableId, "tableId"); - this.bigtableService = checkNotNull(bigtableService, "bigtableService"); - } - - public String getTableId() { - return tableId; - } - - public BigtableService getBigtableService() { - return bigtableService; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(Sink.class) - .add("bigtableService", bigtableService) - .add("tableId", tableId) - .toString(); - } - - /////////////////////////////////////////////////////////////////////////////// - private final String tableId; - private final BigtableService bigtableService; - - @Override - public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation( - PipelineOptions options) { - return new BigtableWriteOperation(this); - } - - /** Does nothing, as it is redundant with {@link Write#validate}. */ - @Override - public void validate(PipelineOptions options) {} - } - - private static class BigtableWriteOperation - extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> { - private final Sink sink; - - public BigtableWriteOperation(Sink sink) { - this.sink = sink; - } - - @Override - public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options) - throws Exception { - return new BigtableWriter(this); - } - - @Override - public void initialize(PipelineOptions options) {} - - @Override - public void finalize(Iterable<Long> writerResults, PipelineOptions options) { - long count = 0; - for (Long value : writerResults) { - value += count; - } - logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink); - } - - @Override - public Sink getSink() { - return sink; - } - - @Override - public Coder<Long> getWriterResultCoder() { - return VarLongCoder.of(); - } - } - - private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> { - private final BigtableWriteOperation writeOperation; - private final Sink sink; - private BigtableService.Writer bigtableWriter; - private long recordsWritten; - private final ConcurrentLinkedQueue<BigtableWriteException> failures; - - public BigtableWriter(BigtableWriteOperation writeOperation) { - this.writeOperation = writeOperation; - this.sink = writeOperation.getSink(); - this.failures = new ConcurrentLinkedQueue<>(); - } - - @Override - public void open(String uId) throws Exception { - bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId()); - recordsWritten = 0; - } - - /** - * If any write has asynchronously failed, fail the bundle with a useful error. - */ - private void checkForFailures() throws IOException { - // Note that this function is never called by multiple threads and is the only place that - // we remove from failures, so this code is safe. - if (failures.isEmpty()) { - return; - } - - StringBuilder logEntry = new StringBuilder(); - int i = 0; - for (; i < 10 && !failures.isEmpty(); ++i) { - BigtableWriteException exc = failures.remove(); - logEntry.append("\n").append(exc.getMessage()); - if (exc.getCause() != null) { - logEntry.append(": ").append(exc.getCause().getMessage()); - } - } - String message = - String.format( - "At least %d errors occurred writing to Bigtable. First %d errors: %s", - i + failures.size(), - i, - logEntry.toString()); - logger.error(message); - throw new IOException(message); - } - - @Override - public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception { - checkForFailures(); - Futures.addCallback( - bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations)); - ++recordsWritten; - } - - @Override - public Long close() throws Exception { - bigtableWriter.close(); - bigtableWriter = null; - checkForFailures(); - logger.info("Wrote {} records", recordsWritten); - return recordsWritten; - } - - @Override - public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() { - return writeOperation; - } - - private class WriteExceptionCallback implements FutureCallback<Empty> { - private final KV<ByteString, Iterable<Mutation>> value; - - public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) { - this.value = value; - } - - @Override - public void onFailure(Throwable cause) { - failures.add(new BigtableWriteException(value, cause)); - } - - @Override - public void onSuccess(Empty produced) {} - } - } - - /** - * An exception that puts information about the failed record being written in its message. - */ - static class BigtableWriteException extends IOException { - public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) { - super( - String.format( - "Error mutating row %s with mutations %s", - record.getKey().toStringUtf8(), - record.getValue()), - cause); - } - } - - /** - * A helper function to produce a Cloud Bigtable user agent string. - */ - private static String getUserAgent() { - String javaVersion = System.getProperty("java.specification.version"); - DataflowReleaseInfo info = DataflowReleaseInfo.getReleaseInfo(); - return String.format( - "%s/%s (%s); %s", - info.getName(), - info.getVersion(), - javaVersion, - "0.2.3" /* TODO get Bigtable client version directly from jar. */); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java deleted file mode 100644 index 0c47f65..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableService.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.bigtable; - -import com.google.bigtable.v1.Mutation; -import com.google.bigtable.v1.Row; -import com.google.bigtable.v1.SampleRowKeysResponse; -import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.Empty; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * An interface for real or fake implementations of Cloud Bigtable. - */ -interface BigtableService extends Serializable { - - /** - * The interface of a class that can write to Cloud Bigtable. - */ - interface Writer { - /** - * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the - * row key to be mutated and the iterable of mutations represent the changes to be made to the - * row. - * - * @throws IOException if there is an error submitting the write. - */ - ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) - throws IOException; - - /** - * Closes the writer. - * - * @throws IOException if any writes did not succeed - */ - void close() throws IOException; - } - - /** - * The interface of a class that reads from Cloud Bigtable. - */ - interface Reader { - /** - * Reads the first element (including initialization, such as opening a network connection) and - * returns true if an element was found. - */ - boolean start() throws IOException; - - /** - * Attempts to read the next element, and returns true if an element has been read. - */ - boolean advance() throws IOException; - - /** - * Closes the reader. - * - * @throws IOException if there is an error. - */ - void close() throws IOException; - - /** - * Returns the last row read by a successful start() or advance(), or throws if there is no - * current row because the last such call was unsuccessful. - */ - Row getCurrentRow() throws NoSuchElementException; - } - - /** - * Returns {@code true} if the table with the give name exists. - */ - boolean tableExists(String tableId) throws IOException; - - /** - * Returns a {@link Reader} that will read from the specified source. - */ - Reader createReader(BigtableSource source) throws IOException; - - /** - * Returns a {@link Writer} that will write to the specified table. - */ - Writer openForWriting(String tableId) throws IOException; - - /** - * Returns a set of row keys sampled from the underlying table. These contain information about - * the distribution of keys within the table. - */ - List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java deleted file mode 100644 index 9f32022..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableServiceImpl.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.bigtable; - -import com.google.bigtable.admin.table.v1.GetTableRequest; -import com.google.bigtable.v1.MutateRowRequest; -import com.google.bigtable.v1.Mutation; -import com.google.bigtable.v1.ReadRowsRequest; -import com.google.bigtable.v1.Row; -import com.google.bigtable.v1.RowRange; -import com.google.bigtable.v1.SampleRowKeysRequest; -import com.google.bigtable.v1.SampleRowKeysResponse; -import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.grpc.BigtableSession; -import com.google.cloud.bigtable.grpc.async.AsyncExecutor; -import com.google.cloud.bigtable.grpc.async.HeapSizeManager; -import com.google.cloud.bigtable.grpc.scanner.ResultScanner; -import com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO.BigtableSource; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.base.MoreObjects; -import com.google.common.io.Closer; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.Empty; - -import io.grpc.Status.Code; -import io.grpc.StatusRuntimeException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.NoSuchElementException; - -/** - * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable - * service. - */ -class BigtableServiceImpl implements BigtableService { - private static final Logger logger = LoggerFactory.getLogger(BigtableService.class); - - public BigtableServiceImpl(BigtableOptions options) { - this.options = options; - } - - private final BigtableOptions options; - - @Override - public BigtableWriterImpl openForWriting(String tableId) throws IOException { - BigtableSession session = new BigtableSession(options); - String tableName = options.getClusterName().toTableNameStr(tableId); - return new BigtableWriterImpl(session, tableName); - } - - @Override - public boolean tableExists(String tableId) throws IOException { - if (!BigtableSession.isAlpnProviderEnabled()) { - logger.info( - "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not" - + " configured.", - tableId, - options); - return true; - } - - try (BigtableSession session = new BigtableSession(options)) { - GetTableRequest getTable = - GetTableRequest.newBuilder() - .setName(options.getClusterName().toTableNameStr(tableId)) - .build(); - session.getTableAdminClient().getTable(getTable); - return true; - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == Code.NOT_FOUND) { - return false; - } - String message = - String.format( - "Error checking whether table %s (BigtableOptions %s) exists", tableId, options); - logger.error(message, e); - throw new IOException(message, e); - } - } - - private class BigtableReaderImpl implements Reader { - private BigtableSession session; - private final BigtableSource source; - private ResultScanner<Row> results; - private Row currentRow; - - public BigtableReaderImpl(BigtableSession session, BigtableSource source) { - this.session = session; - this.source = source; - } - - @Override - public boolean start() throws IOException { - RowRange range = - RowRange.newBuilder() - .setStartKey(source.getRange().getStartKey().getValue()) - .setEndKey(source.getRange().getEndKey().getValue()) - .build(); - ReadRowsRequest.Builder requestB = - ReadRowsRequest.newBuilder() - .setRowRange(range) - .setTableName(options.getClusterName().toTableNameStr(source.getTableId())); - if (source.getRowFilter() != null) { - requestB.setFilter(source.getRowFilter()); - } - results = session.getDataClient().readRows(requestB.build()); - return advance(); - } - - @Override - public boolean advance() throws IOException { - currentRow = results.next(); - return (currentRow != null); - } - - @Override - public void close() throws IOException { - // Goal: by the end of this function, both results and session are null and closed, - // independent of what errors they throw or prior state. - - if (session == null) { - // Only possible when previously closed, so we know that results is also null. - return; - } - - // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with - // the Closer, but we can use the Closer to simplify the error handling. - try (Closer closer = Closer.create()) { - if (results != null) { - closer.register(results); - results = null; - } - - session.close(); - } finally { - session = null; - } - } - - @Override - public Row getCurrentRow() throws NoSuchElementException { - if (currentRow == null) { - throw new NoSuchElementException(); - } - return currentRow; - } - } - - private static class BigtableWriterImpl implements Writer { - private BigtableSession session; - private AsyncExecutor executor; - private final MutateRowRequest.Builder partialBuilder; - - public BigtableWriterImpl(BigtableSession session, String tableName) { - this.session = session; - this.executor = - new AsyncExecutor( - session.getDataClient(), - new HeapSizeManager( - AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT, - AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT)); - - partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName); - } - - @Override - public void close() throws IOException { - try { - if (executor != null) { - executor.flush(); - executor = null; - } - } finally { - if (session != null) { - session.close(); - session = null; - } - } - } - - @Override - public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) - throws IOException { - MutateRowRequest r = - partialBuilder - .clone() - .setRowKey(record.getKey()) - .addAllMutations(record.getValue()) - .build(); - try { - return executor.mutateRowAsync(r); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Write interrupted", e); - } - } - } - - @Override - public String toString() { - return MoreObjects - .toStringHelper(BigtableServiceImpl.class) - .add("options", options) - .toString(); - } - - @Override - public Reader createReader(BigtableSource source) throws IOException { - BigtableSession session = new BigtableSession(options); - return new BigtableReaderImpl(session, source); - } - - @Override - public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException { - try (BigtableSession session = new BigtableSession(options)) { - SampleRowKeysRequest request = - SampleRowKeysRequest.newBuilder() - .setTableName(options.getClusterName().toTableNameStr(source.getTableId())) - .build(); - return session.getDataClient().sampleRowKeys(request); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java deleted file mode 100644 index 553f46c..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Defines transforms for reading and writing from Google Cloud Bigtable. - * - * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO - */ -package com.google.cloud.dataflow.sdk.io.bigtable; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java deleted file mode 100644 index 5f0050d..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/package-info.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Defines transforms for reading and writing common storage formats, including - * {@link com.google.cloud.dataflow.sdk.io.AvroIO}, - * {@link com.google.cloud.dataflow.sdk.io.BigQueryIO}, and - * {@link com.google.cloud.dataflow.sdk.io.TextIO}. - * - * <p>The classes in this package provide {@code Read} transforms that create PCollections - * from existing storage: - * <pre>{@code - * PCollection<TableRow> inputData = pipeline.apply( - * BigQueryIO.Read.named("Read") - * .from("clouddataflow-readonly:samples.weather_stations"); - * }</pre> - * and {@code Write} transforms that persist PCollections to external storage: - * <pre> {@code - * PCollection<Integer> numbers = ...; - * numbers.apply(TextIO.Write.named("WriteNumbers") - * .to("gs://my_bucket/path/to/numbers")); - * } </pre> - */ -package com.google.cloud.dataflow.sdk.io; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java deleted file mode 100644 index 5b9a003..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKey.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.range; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.protobuf.ByteString; -import com.google.protobuf.ByteString.ByteIterator; - -import java.io.Serializable; - -/** - * A class representing a key consisting of an array of bytes. Arbitrary-length - * {@code byte[]} keys are typical in key-value stores such as Google Cloud Bigtable. - * - * <p>Instances of {@link ByteKey} are immutable. - * - * <p>{@link ByteKey} implements {@link Comparable Comparable<ByteKey>} by comparing the - * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the successor - * to a key is the same key with an additional 0 byte appended; and keys have unbounded size. - * - * <p>Note that the empty {@link ByteKey} compares smaller than all other keys, but some systems - * have the semantic that when an empty {@link ByteKey} is used as an upper bound, it represents - * the largest possible key. In these cases, implementors should use {@link #isEmpty} to test - * whether an upper bound key is empty. - */ -public final class ByteKey implements Comparable<ByteKey>, Serializable { - /** An empty key. */ - public static final ByteKey EMPTY = ByteKey.of(); - - /** - * Creates a new {@link ByteKey} backed by the specified {@link ByteString}. - */ - public static ByteKey of(ByteString value) { - return new ByteKey(value); - } - - /** - * Creates a new {@link ByteKey} backed by a copy of the specified {@code byte[]}. - * - * <p>Makes a copy of the underlying array. - */ - public static ByteKey copyFrom(byte[] bytes) { - return of(ByteString.copyFrom(bytes)); - } - - /** - * Creates a new {@link ByteKey} backed by a copy of the specified {@code int[]}. This method is - * primarily used as a convenience to create a {@link ByteKey} in code without casting down to - * signed Java {@link Byte bytes}: - * - * <pre>{@code - * ByteKey key = ByteKey.of(0xde, 0xad, 0xbe, 0xef); - * }</pre> - * - * <p>Makes a copy of the input. - */ - public static ByteKey of(int... bytes) { - byte[] ret = new byte[bytes.length]; - for (int i = 0; i < bytes.length; ++i) { - ret[i] = (byte) (bytes[i] & 0xff); - } - return ByteKey.copyFrom(ret); - } - - /** - * Returns an immutable {@link ByteString} representing this {@link ByteKey}. - * - * <p>Does not copy. - */ - public ByteString getValue() { - return value; - } - - /** - * Returns a newly-allocated {@code byte[]} representing this {@link ByteKey}. - * - * <p>Copies the underlying {@code byte[]}. - */ - public byte[] getBytes() { - return value.toByteArray(); - } - - /** - * Returns {@code true} if the {@code byte[]} backing this {@link ByteKey} is of length 0. - */ - public boolean isEmpty() { - return value.isEmpty(); - } - - /** - * {@link ByteKey} implements {@link Comparable Comparable<ByteKey>} by comparing the - * arrays in lexicographic order. The smallest {@link ByteKey} is a zero-length array; the - * successor to a key is the same key with an additional 0 byte appended; and keys have unbounded - * size. - */ - @Override - public int compareTo(ByteKey other) { - checkNotNull(other, "other"); - ByteIterator thisIt = value.iterator(); - ByteIterator otherIt = other.value.iterator(); - while (thisIt.hasNext() && otherIt.hasNext()) { - // (byte & 0xff) converts [-128,127] bytes to [0,255] ints. - int cmp = (thisIt.nextByte() & 0xff) - (otherIt.nextByte() & 0xff); - if (cmp != 0) { - return cmp; - } - } - // If we get here, the prefix of both arrays is equal up to the shorter array. The array with - // more bytes is larger. - return value.size() - other.value.size(); - } - - //////////////////////////////////////////////////////////////////////////////////// - private final ByteString value; - - private ByteKey(ByteString value) { - this.value = value; - } - - /** Array used as a helper in {@link #toString}. */ - private static final char[] HEX = - new char[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; - - // Prints the key as a string "[deadbeef]". - @Override - public String toString() { - char[] encoded = new char[2 * value.size() + 2]; - encoded[0] = '['; - int cnt = 1; - ByteIterator iterator = value.iterator(); - while (iterator.hasNext()) { - byte b = iterator.nextByte(); - encoded[cnt] = HEX[(b & 0xF0) >>> 4]; - ++cnt; - encoded[cnt] = HEX[b & 0xF]; - ++cnt; - } - encoded[cnt] = ']'; - return new String(encoded); - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof ByteKey)) { - return false; - } - ByteKey other = (ByteKey) o; - return (other.value.size() == value.size()) && this.compareTo(other) == 0; - } - - @Override - public int hashCode() { - return value.hashCode(); - } -}