Repository: beam Updated Branches: refs/heads/master 0d639e63d -> ed0d4577d
[BEAM-1157] Add HBaseIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89fae457 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89fae457 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89fae457 Branch: refs/heads/master Commit: 89fae457e7dc5e0ab1102e2ecdb2a23004f198ba Parents: 0d639e6 Author: Ismaël MejÃa <[email protected]> Authored: Thu Feb 9 15:56:18 2017 +0100 Committer: Dan Halperin <[email protected]> Committed: Tue Feb 21 23:19:20 2017 -0800 ---------------------------------------------------------------------- pom.xml | 6 + sdks/java/io/hbase/pom.xml | 238 +++++++ .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 696 +++++++++++++++++++ .../sdk/io/hbase/coders/HBaseMutationCoder.java | 71 ++ .../sdk/io/hbase/coders/HBaseResultCoder.java | 54 ++ .../hbase/coders/SerializableConfiguration.java | 50 ++ .../sdk/io/hbase/coders/SerializableScan.java | 49 ++ .../beam/sdk/io/hbase/coders/package-info.java | 24 + .../apache/beam/sdk/io/hbase/package-info.java | 24 + .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 430 ++++++++++++ sdks/java/io/pom.xml | 5 +- sdks/java/javadoc/pom.xml | 5 + 12 files changed, 1650 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a8f99fd..0a7b79f 100644 --- a/pom.xml +++ b/pom.xml @@ -384,6 +384,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hbase</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hdfs</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml new file mode 100644 index 0000000..23582d2 --- /dev/null +++ b/sdks/java/io/hbase/pom.xml @@ -0,0 +1,238 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.6.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-hbase</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: HBase</name> + <description>Library to read and write from/to HBase</description> + + <properties> + <hbase.version>1.3.0</hbase.version> + <hbase.guava.version>12.0.1</hbase.guava.version> + <hbase.protobuf.version>2.5.0</hbase.protobuf.version> + <hadoop.version>2.5.1</hadoop.version> + </properties> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>bundle-and-repackage</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom>false</createDependencyReducedPom> + <artifactSet> + <includes> + <include>com.google.guava:guava</include> + <include>com.google.protobuf:protobuf-java</include> + </includes> + </artifactSet> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.beam.sdk.io.hbase.repackaged.com.google.common</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.thirdparty</pattern> + <shadedPattern>org.apache.beam.sdk.io.hbase.repackaged.com.google.thirdparty</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.protobuf</pattern> + <shadedPattern>org.apache.beam.sdk.io.hbase.repackaged.com.google.protobuf</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <exclusions> + <!-- exclude beam version of protobuf --> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${hbase.guava.version}</version> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>provided</scope> + <version>${hbase.protobuf.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-protocol</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${hbase.version}</version> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.6</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java new file mode 100644 index 0000000..3b5f4da --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -0,0 +1,696 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.protobuf.ByteString; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.TreeSet; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteStringCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hbase.coders.HBaseMutationCoder; +import org.apache.beam.sdk.io.hbase.coders.HBaseResultCoder; +import org.apache.beam.sdk.io.hbase.coders.SerializableConfiguration; +import org.apache.beam.sdk.io.hbase.coders.SerializableScan; +import org.apache.beam.sdk.io.range.ByteKey; +import org.apache.beam.sdk.io.range.ByteKeyRange; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A bounded source and sink for HBase. + * + * <p>For more information, see the online documentation at + * <a href="https://hbase.apache.org/">HBase</a>. + * + * <h3>Reading from HBase</h3> + * + * <p>The HBase source returns a set of rows from a single table, returning a + * {@code PCollection<Result>}. + * + * <p>To configure a HBase source, you must supply a table id and a {@link Configuration} + * to identify the HBase instance. By default, {@link HBaseIO.Read} will read all rows in the + * table. The row range to be read can optionally be restricted using with a {@link Scan} object + * or using the {@link HBaseIO.Read#withKeyRange}, and a {@link Filter} using + * {@link HBaseIO.Read#withFilter}, for example: + * + * <pre>{@code + * // Scan the entire table. + * p.apply("read", + * HBaseIO.read() + * .withConfiguration(configuration) + * .withTableId("table")); + * + * // Filter data using a HBaseIO Scan + * Scan scan = ... + * p.apply("read", + * HBaseIO.read() + * .withConfiguration(configuration) + * .withTableId("table")) + * .withScan(scan)); + * + * // Scan a prefix of the table. + * ByteKeyRange keyRange = ...; + * p.apply("read", + * HBaseIO.read() + * .withConfiguration(configuration) + * .withTableId("table") + * .withKeyRange(keyRange)); + * + * // Scan a subset of rows that match the specified row filter. + * p.apply("filtered read", + * HBaseIO.read() + * .withConfiguration(configuration) + * .withTableId("table") + * .withFilter(filter)); + * }</pre> + * + * <h3>Writing to HBase</h3> + * + * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a + * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the + * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an + * idempotent transformation to that row. + * + * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} + * to identify the HBase instance, for example: + * + * <pre>{@code + * Configuration configuration = ...; + * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...; + * data.setCoder(HBaseIO.WRITE_CODER); + * + * data.apply("write", + * HBaseIO.write() + * .withConfiguration(configuration) + * .withTableId("table")); + * }</pre> + * + * <h3>Experimental</h3> + * + * <p>The design of the API for HBaseIO is currently related to the BigtableIO one, + * it can evolve or be different in some aspects, but the idea is that users can easily migrate + * from one to the other</p>. + */ +@Experimental +public class HBaseIO { + private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class); + + /** Disallow construction of utility class. */ + private HBaseIO() { + } + + /** + * Creates an uninitialized {@link HBaseIO.Read}. Before use, the {@code Read} must be + * initialized with a + * {@link HBaseIO.Read#withConfiguration(Configuration)} that specifies + * the HBase instance, and a {@link HBaseIO.Read#withTableId tableId} that + * specifies which table to read. A {@link Filter} may also optionally be specified using + * {@link HBaseIO.Read#withFilter}. + */ + @Experimental + public static Read read() { + return new Read(null, "", new SerializableScan(new Scan())); + } + + /** + * A {@link PTransform} that reads from HBase. See the class-level Javadoc on + * {@link HBaseIO} for more information. + * + * @see HBaseIO + */ + public static class Read extends PTransform<PBegin, PCollection<Result>> { + /** + * Returns a new {@link HBaseIO.Read} that will read from the HBase instance + * indicated by the given configuration. + */ + public Read withConfiguration(Configuration configuration) { + checkNotNull(configuration, "conf"); + return new Read(new SerializableConfiguration(configuration), + tableId, serializableScan); + } + + /** + * Returns a new {@link HBaseIO.Read} that will read from the specified table. + * + * <p>Does not modify this object. + */ + public Read withTableId(String tableId) { + checkNotNull(tableId, "tableId"); + return new Read(serializableConfiguration, tableId, serializableScan); + } + + /** + * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase + * using the given scan. + * + * <p>Does not modify this object. + */ + public Read withScan(Scan scan) { + checkNotNull(scan, "scan"); + return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); + } + + /** + * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase + * using the given row filter. + * + * <p>Does not modify this object. + */ + public Read withFilter(Filter filter) { + checkNotNull(filter, "filter"); + return withScan(serializableScan.getScan().setFilter(filter)); + } + + /** + * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. + * + * <p>Does not modify this object. + */ + public Read withKeyRange(ByteKeyRange keyRange) { + checkNotNull(keyRange, "keyRange"); + byte[] startRow = keyRange.getStartKey().getBytes(); + byte[] stopRow = keyRange.getEndKey().getBytes(); + return withScan(serializableScan.getScan().setStartRow(startRow).setStopRow(stopRow)); + } + + /** + * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. + * + * <p>Does not modify this object. + */ + public Read withKeyRange(byte[] startRow, byte[] stopRow) { + checkNotNull(startRow, "startRow"); + checkNotNull(stopRow, "stopRow"); + ByteKeyRange keyRange = + ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); + return withKeyRange(keyRange); + } + + private Read(SerializableConfiguration serializableConfiguration, String tableId, + SerializableScan serializableScan) { + this.serializableConfiguration = serializableConfiguration; + this.tableId = tableId; + this.serializableScan = serializableScan; + } + + @Override + public PCollection<Result> expand(PBegin input) { + HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); + return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); + } + + @Override + public void validate(PBegin input) { + checkArgument(serializableConfiguration != null, + "Configuration not provided"); + checkArgument(!tableId.isEmpty(), "Table ID not specified"); + try (Connection connection = ConnectionFactory.createConnection( + serializableConfiguration.getConfiguration())) { + Admin admin = connection.getAdmin(); + checkArgument(admin.tableExists(TableName.valueOf(tableId)), + "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("configuration", + serializableConfiguration.getConfiguration().toString())); + builder.add(DisplayData.item("tableId", tableId)); + builder.addIfNotNull(DisplayData.item("scan", serializableScan.getScan().toString())); + } + + public String getTableId() { + return tableId; + } + + public Configuration getConfiguration() { + return serializableConfiguration.getConfiguration(); + } + + /** + * Returns the range of keys that will be read from the table. + */ + public ByteKeyRange getKeyRange() { + byte[] startRow = serializableScan.getScan().getStartRow(); + byte[] stopRow = serializableScan.getScan().getStopRow(); + return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); + } + + private SerializableConfiguration serializableConfiguration; + private String tableId; + private SerializableScan serializableScan; + } + + static class HBaseSource extends BoundedSource<Result> { + private Read read; + @Nullable private Long estimatedSizeBytes; + + HBaseSource(Read read, @Nullable Long estimatedSizeBytes) { + this.read = read; + this.estimatedSizeBytes = estimatedSizeBytes; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { + if (estimatedSizeBytes == null) { + estimatedSizeBytes = estimateSizeBytes(); + LOG.debug("Estimated size {} bytes for table {} and scan {}", estimatedSizeBytes, + read.tableId, read.serializableScan.getScan()); + } + return estimatedSizeBytes; + } + + /** + * This estimates the real size, it can be the compressed size depending on the HBase + * configuration. + */ + private long estimateSizeBytes() throws Exception { + // This code is based on RegionSizeCalculator in hbase-server + long estimatedSizeBytes = 0L; + Configuration configuration = this.read.serializableConfiguration.getConfiguration(); + try (Connection connection = ConnectionFactory.createConnection(configuration)) { + // filter regions for the given table/scan + List<HRegionLocation> regionLocations = getRegionLocations(connection); + + // builds set of regions who are part of the table scan + Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (HRegionLocation regionLocation : regionLocations) { + tableRegions.add(regionLocation.getRegionInfo().getRegionName()); + } + + // calculate estimated size for the regions + Admin admin = connection.getAdmin(); + ClusterStatus clusterStatus = admin.getClusterStatus(); + Collection<ServerName> servers = clusterStatus.getServers(); + for (ServerName serverName : servers) { + ServerLoad serverLoad = clusterStatus.getLoad(serverName); + for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) { + byte[] regionId = regionLoad.getName(); + if (tableRegions.contains(regionId)) { + long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L; + estimatedSizeBytes += regionSizeBytes; + } + } + } + } + return estimatedSizeBytes; + } + + private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception { + final Scan scan = read.serializableScan.getScan(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + + final List<HRegionLocation> regionLocations = new ArrayList<>(); + + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; + + TableName tableName = TableName.valueOf(read.tableId); + RegionLocator regionLocator = connection.getRegionLocator(tableName); + List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations(); + for (HRegionLocation regionLocation : tableRegionInfos) { + final byte[] startKey = regionLocation.getRegionInfo().getStartKey(); + final byte[] endKey = regionLocation.getRegionInfo().getEndKey(); + boolean isLastRegion = endKey.length == 0; + // filters regions who are part of the scan + if ((scanWithNoLowerBound + || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) + && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { + regionLocations.add(regionLocation); + } + } + + return regionLocations; + } + + private List<HBaseSource> + splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits) + throws Exception { + final Scan scan = read.serializableScan.getScan(); + byte[] startRow = scan.getStartRow(); + byte[] stopRow = scan.getStopRow(); + + final List<HBaseSource> sources = new ArrayList<>(numSplits); + final boolean scanWithNoLowerBound = startRow.length == 0; + final boolean scanWithNoUpperBound = stopRow.length == 0; + + for (HRegionLocation regionLocation : regionLocations) { + final byte[] startKey = regionLocation.getRegionInfo().getStartKey(); + final byte[] endKey = regionLocation.getRegionInfo().getEndKey(); + boolean isLastRegion = endKey.length == 0; + String host = regionLocation.getHostnamePort(); + + final byte[] splitStart = (scanWithNoLowerBound + || Bytes.compareTo(startKey, startRow) >= 0) ? startKey : startRow; + final byte[] splitStop = + (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) + && !isLastRegion ? endKey : stopRow; + + LOG.debug("{} {} {} {} {}", sources.size(), host, read.tableId, + Bytes.toString(splitStart), Bytes.toString(splitStop)); + + // We need to create a new copy of the scan and read to add the new ranges + Scan newScan = new Scan(scan).setStartRow(splitStart).setStopRow(splitStop); + Read newRead = new Read(read.serializableConfiguration, read.tableId, + new SerializableScan(newScan)); + sources.add(new HBaseSource(newRead, estimatedSizeBytes)); + } + return sources; + } + + @Override + public List<? extends BoundedSource<Result>> + splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) + throws Exception { + LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes); + long estimatedSizeBytes = getEstimatedSizeBytes(options); + int numSplits = 1; + if (estimatedSizeBytes > 0 && desiredBundleSizeBytes > 0) { + numSplits = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes); + } + + try (Connection connection = ConnectionFactory.createConnection( + read.getConfiguration())) { + List<HRegionLocation> regionLocations = getRegionLocations(connection); + int realNumSplits = + numSplits < regionLocations.size() ? regionLocations.size() : numSplits; + LOG.debug("Suggested {} bundle(s) based on size", numSplits); + LOG.debug("Suggested {} bundle(s) based on number of regions", + regionLocations.size()); + final List<HBaseSource> sources = splitBasedOnRegions(regionLocations, + realNumSplits); + LOG.debug("Split into {} bundle(s)", sources.size()); + if (numSplits >= 1) { + return sources; + } + return Collections.singletonList(this); + } + } + + @Override + public BoundedReader<Result> createReader(PipelineOptions pipelineOptions) + throws IOException { + return new HBaseReader(this); + } + + @Override + public void validate() { + read.validate(null /* input */); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + read.populateDisplayData(builder); + } + + @Override + public Coder<Result> getDefaultOutputCoder() { + return HBaseResultCoder.of(); + } + } + + private static class HBaseReader extends BoundedSource.BoundedReader<Result> { + private final HBaseSource source; + private Connection connection; + private ResultScanner scanner; + private Iterator<Result> iter; + private Result result; + private long recordsReturned; + + HBaseReader(HBaseSource source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + Configuration configuration = source.read.serializableConfiguration.getConfiguration(); + String tableId = source.read.tableId; + connection = ConnectionFactory.createConnection(configuration); + TableName tableName = TableName.valueOf(tableId); + Table table = connection.getTable(tableName); + Scan scan = source.read.serializableScan.getScan(); + scanner = table.getScanner(scan); + iter = scanner.iterator(); + return advance(); + } + + @Override + public Result getCurrent() throws NoSuchElementException { + return result; + } + + @Override + public boolean advance() throws IOException { + boolean hasRecord = iter.hasNext(); + if (hasRecord) { + result = iter.next(); + ++recordsReturned; + } + return hasRecord; + } + + @Override + public void close() throws IOException { + LOG.debug("Closing reader after reading {} records.", recordsReturned); + if (scanner != null) { + scanner.close(); + scanner = null; + } + if (connection != null) { + connection.close(); + connection = null; + } + } + + @Override + public BoundedSource<Result> getCurrentSource() { + return source; + } + } + + /** + * Creates an uninitialized {@link HBaseIO.Write}. Before use, the {@code Write} must be + * initialized with a + * {@link HBaseIO.Write#withConfiguration(Configuration)} that specifies + * the destination HBase instance, and a {@link HBaseIO.Write#withTableId tableId} + * that specifies which table to write. + */ + public static Write write() { + return new Write(null /* SerializableConfiguration */, ""); + } + + /** + * A {@link PTransform} that writes to HBase. See the class-level Javadoc on + * {@link HBaseIO} for more information. + * + * @see HBaseIO + */ + public static class Write + extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> { + + /** + * Returns a new {@link HBaseIO.Write} that will write to the HBase instance + * indicated by the given Configuration, and using any other specified customizations. + * + * <p>Does not modify this object. + */ + public Write withConfiguration(Configuration configuration) { + checkNotNull(configuration, "conf"); + return new Write(new SerializableConfiguration(configuration), tableId); + } + + /** + * Returns a new {@link HBaseIO.Write} that will write to the specified table. + * + * <p>Does not modify this object. + */ + public Write withTableId(String tableId) { + checkNotNull(tableId, "tableId"); + return new Write(serializableConfiguration, tableId); + } + + private Write(SerializableConfiguration serializableConfiguration, String tableId) { + this.serializableConfiguration = serializableConfiguration; + this.tableId = tableId; + } + + @Override + public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) { + input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) { + checkArgument(serializableConfiguration != null, "Configuration not specified"); + checkArgument(!tableId.isEmpty(), "Table ID not specified"); + try (Connection connection = ConnectionFactory.createConnection( + serializableConfiguration.getConfiguration())) { + Admin admin = connection.getAdmin(); + checkArgument(admin.tableExists(TableName.valueOf(tableId)), + "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("configuration", + serializableConfiguration.getConfiguration().toString())); + builder.add(DisplayData.item("tableId", tableId)); + } + + public String getTableId() { + return tableId; + } + + public Configuration getConfiguration() { + return serializableConfiguration.getConfiguration(); + } + + private final String tableId; + private final SerializableConfiguration serializableConfiguration; + + private class HBaseWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> { + + public HBaseWriterFn(String tableId, + SerializableConfiguration serializableConfiguration) { + this.tableId = checkNotNull(tableId, "tableId"); + this.serializableConfiguration = checkNotNull(serializableConfiguration, + "serializableConfiguration"); + } + + @Setup + public void setup() throws Exception { + Configuration configuration = this.serializableConfiguration.getConfiguration(); + connection = ConnectionFactory.createConnection(configuration); + + TableName tableName = TableName.valueOf(tableId); + BufferedMutatorParams params = + new BufferedMutatorParams(tableName); + mutator = connection.getBufferedMutator(params); + + recordsWritten = 0; + } + + @StartBundle + public void startBundle(Context c) throws Exception { + + } + + @ProcessElement + public void processElement(ProcessContext ctx) throws Exception { + KV<ByteString, Iterable<Mutation>> record = ctx.element(); + List<Mutation> mutations = new ArrayList<>(); + for (Mutation mutation : record.getValue()) { + mutations.add(mutation); + ++recordsWritten; + } + mutator.mutate(mutations); + } + + @FinishBundle + public void finishBundle(Context c) throws Exception { + mutator.flush(); + } + + @Teardown + public void tearDown() throws Exception { + if (mutator != null) { + mutator.close(); + mutator = null; + } + if (connection != null) { + connection.close(); + connection = null; + } + LOG.debug("Wrote {} records", recordsWritten); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(Write.this); + } + + private final String tableId; + private final SerializableConfiguration serializableConfiguration; + + private Connection connection; + private BufferedMutator mutator; + + private long recordsWritten; + } + } + + public static final Coder<KV<ByteString, Iterable<Mutation>>> WRITE_CODER = + KvCoder.of(ByteStringCoder.of(), IterableCoder.of(HBaseMutationCoder.of())); +} http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java new file mode 100644 index 0000000..a99a943 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseMutationCoder.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.coders; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + +/** + * A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link + * ProtobufUtil}. + */ +public class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { + private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder(); + + private HBaseMutationCoder() {} + + public static HBaseMutationCoder of() { + return INSTANCE; + } + + @Override + public void encode(Mutation mutation, OutputStream outStream, + Coder.Context context) throws IOException { + MutationType type = getType(mutation); + MutationProto proto = ProtobufUtil.toMutation(type, mutation); + proto.writeDelimitedTo(outStream); + } + + @Override + public Mutation decode(InputStream inStream, + Coder.Context context) throws IOException { + return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); + } + + private static MutationType getType(Mutation mutation) { + if (mutation instanceof Put) { + return MutationType.PUT; + } else if (mutation instanceof Delete) { + return MutationType.DELETE; + } else { + // Increment and Append are not idempotent. They should not be used in distributed jobs. + throw new IllegalArgumentException("Only Put and Delete are supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java new file mode 100644 index 0000000..f10a517 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/HBaseResultCoder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.coders; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link + * ProtobufUtil}. + */ +public class HBaseResultCoder extends AtomicCoder<Result> implements Serializable { + + private static final HBaseResultCoder INSTANCE = new HBaseResultCoder(); + + public static HBaseResultCoder of() { + return INSTANCE; + } + + @Override + public Result decode(InputStream inputStream, Coder.Context context) + throws IOException { + return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream)); + } + + @Override + public void encode(Result value, OutputStream outputStream, Coder.Context context) + throws IOException { + ProtobufUtil.toResult(value).writeDelimitedTo(outputStream); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java new file mode 100644 index 0000000..de479de --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.coders; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.hadoop.conf.Configuration; + +/** + * This is just a wrapper class to serialize Hadoop/HBase {@link Configuration}. + */ +public class SerializableConfiguration implements Serializable { + private transient Configuration conf; + + public SerializableConfiguration(Configuration conf) { + this.conf = conf; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + conf.write(out); + } + + private void readObject(ObjectInputStream in) throws IOException { + conf = new Configuration(false); + conf.readFields(in); + } + + public Configuration getConfiguration() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java new file mode 100644 index 0000000..96beff9 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/SerializableScan.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase.coders; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; + +/** + * This is just a wrapper class to serialize HBase {@link Scan}. + */ +public class SerializableScan implements Serializable { + private transient Scan scan; + + public SerializableScan(Scan scan) { + this.scan = scan; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + ProtobufUtil.toScan(scan).writeDelimitedTo(out); + } + + private void readObject(ObjectInputStream in) throws IOException { + scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in)); + } + + public Scan getScan() { + return scan; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java new file mode 100644 index 0000000..d21b927 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/coders/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines coders used while reading and writing from/to HBase. + * + * @see org.apache.beam.sdk.io.hbase.HBaseIO + */ +package org.apache.beam.sdk.io.hbase.coders; http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java new file mode 100644 index 0000000..95c1ea7 --- /dev/null +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines transforms for reading and writing from HBase. + * + * @see org.apache.beam.sdk.io.hbase.HBaseIO + */ +package org.apache.beam.sdk.io.hbase; http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java new file mode 100644 index 0000000..d3bc4dc --- /dev/null +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hbase; + +import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import com.google.protobuf.ByteString; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hbase.HBaseIO.HBaseSource; +import org.apache.beam.sdk.io.range.ByteKey; +import org.apache.beam.sdk.io.range.ByteKeyRange; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matchers; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test HBaseIO. + */ +@RunWith(JUnit4.class) +public class HBaseIOTest { + @Rule public final transient TestPipeline p = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static HBaseTestingUtility htu; + private static HBaseAdmin admin; + + private static Configuration conf = HBaseConfiguration.create(); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); + private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); + private static final byte[] COLUMN_EMAIL = Bytes.toBytes("email"); + + @BeforeClass + public static void beforeClass() throws Exception { + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + // Try to bind the hostname to localhost to solve an issue when it is not configured or + // no DNS resolution available. + conf.setStrings("hbase.master.hostname", "localhost"); + conf.setStrings("hbase.regionserver.hostname", "localhost"); + htu = new HBaseTestingUtility(conf); + htu.startMiniCluster(1, 4); + admin = htu.getHBaseAdmin(); + } + + @AfterClass + public static void afterClass() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + if (htu != null) { + htu.shutdownMiniCluster(); + htu = null; + } + } + + @Test + public void testReadBuildsCorrectly() { + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("table"); + assertEquals("table", read.getTableId()); + assertNotNull("configuration", read.getConfiguration()); + } + + @Test + public void testReadBuildsCorrectlyInDifferentOrder() { + HBaseIO.Read read = HBaseIO.read().withTableId("table").withConfiguration(conf); + assertEquals("table", read.getTableId()); + assertNotNull("configuration", read.getConfiguration()); + } + + @Test + public void testWriteBuildsCorrectly() { + HBaseIO.Write write = HBaseIO.write().withConfiguration(conf).withTableId("table"); + assertEquals("table", write.getTableId()); + assertNotNull("configuration", write.getConfiguration()); + } + + @Test + public void testWriteBuildsCorrectlyInDifferentOrder() { + HBaseIO.Write write = HBaseIO.write().withTableId("table").withConfiguration(conf); + assertEquals("table", write.getTableId()); + assertNotNull("configuration", write.getConfiguration()); + } + + @Test + public void testWriteValidationFailsMissingTable() { + HBaseIO.Write write = HBaseIO.write().withConfiguration(conf); + thrown.expect(IllegalArgumentException.class); + write.validate(null /* input */); + } + + @Test + public void testWriteValidationFailsMissingConfiguration() { + HBaseIO.Write write = HBaseIO.write().withTableId("table"); + thrown.expect(IllegalArgumentException.class); + write.validate(null /* input */); + } + + /** Tests that when reading from a non-existent table, the read fails. */ + @Test + @Category(NeedsRunner.class) + public void testReadingFailsTableDoesNotExist() throws Exception { + final String table = "TEST-TABLE-INVALID"; + // Exception will be thrown by read.validate() when read is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), + new ArrayList<Result>()); + } + + /** Tests that when reading from an empty table, the read succeeds. */ + @Test + @Category(NeedsRunner.class) + public void testReadingEmptyTable() throws Exception { + final String table = "TEST-EMPTY-TABLE"; + createTable(table); + runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), + new ArrayList<Result>()); + } + + @Test + @Category(NeedsRunner.class) + public void testReading() throws Exception { + final String table = "TEST-MANY-ROWS-TABLE"; + final int numRows = 1001; + createTable(table); + writeData(table, numRows); + runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table), 1001); + } + + /** Tests reading all rows from a split table. */ + @Test + public void testReadingWithSplits() throws Exception { + final String table = "TEST-MANY-ROWS-SPLITS-TABLE"; + final int numRows = 1500; + final int numRegions = 4; + final long bytesPerRow = 100L; + + // Set up test table data and sample row keys for size estimation and splitting. + createTable(table); + writeData(table, numRows); + + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table); + HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */); + List<? extends BoundedSource<Result>> splits = + source.splitIntoBundles(numRows * bytesPerRow / numRegions, + null /* options */); + + // Test num splits and split equality. + assertThat(splits, hasSize(4)); + assertSourcesEqualReferenceSource(source, splits, null /* options */); + } + + + /** Tests reading all rows using a filter. */ + @Test + @Category(NeedsRunner.class) + public void testReadingWithFilter() throws Exception { + final String table = "TEST-FILTER-TABLE"; + final int numRows = 1001; + + createTable(table); + writeData(table, numRows); + + String regex = ".*17.*"; + Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, + new RegexStringComparator(regex)); + HBaseIO.Read read = + HBaseIO.read().withConfiguration(conf).withTableId(table).withFilter(filter); + runReadTestLength(read, 20); + } + + /** + * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], and a restricted + * range [] and that some properties hold across them. + */ + @Test + @Category(NeedsRunner.class) + public void testReadingWithKeyRange() throws Exception { + final String table = "TEST-KEY-RANGE-TABLE"; + final int numRows = 1001; + final byte[] startRow = "2".getBytes(); + final byte[] stopRow = "9".getBytes(); + final ByteKey startKey = ByteKey.copyFrom(startRow); + + createTable(table); + writeData(table, numRows); + + // Test prefix: [beginning, startKey). + final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey); + runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table) + .withKeyRange(prefixRange), 126); + + // Test suffix: [startKey, end). + final ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey); + runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table) + .withKeyRange(suffixRange), 875); + + // Test restricted range: [startKey, endKey). + // This one tests the second signature of .withKeyRange + runReadTestLength(HBaseIO.read().withConfiguration(conf).withTableId(table) + .withKeyRange(startRow, stopRow), 441); + } + + @Test + public void testReadingDisplayData() { + HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId("fooTable"); + DisplayData displayData = DisplayData.from(read); + assertThat(displayData, hasDisplayItem("tableId", "fooTable")); + assertThat(displayData, hasDisplayItem("configuration")); + } + + /** Tests that a record gets written to the service and messages are logged. */ + @Test + @Category(NeedsRunner.class) + public void testWriting() throws Exception { + final String table = "table"; + final String key = "key"; + final String value = "value"; + + createTable(table); + + p.apply("single row", Create.of(makeWrite(key, value)).withCoder(HBaseIO.WRITE_CODER)) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + p.run().waitUntilFinish(); + + List<Result> results = readTable(table, new Scan()); + assertEquals(1, results.size()); + } + + /** Tests that when writing to a non-existent table, the write fails. */ + @Test + public void testWritingFailsTableDoesNotExist() throws Exception { + final String table = "TEST-TABLE"; + + PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput = + p.apply(Create.empty(HBaseIO.WRITE_CODER)); + + // Exception will be thrown by write.validate() when write is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + + emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + } + + /** Tests that when writing an element fails, the write fails. */ + @Test + @Category(NeedsRunner.class) + public void testWritingFailsBadElement() throws Exception { + final String table = "TEST-TABLE"; + final String key = "KEY"; + createTable(table); + + p.apply(Create.of(makeBadWrite(key)).withCoder(HBaseIO.WRITE_CODER)) + .apply(HBaseIO.write().withConfiguration(conf).withTableId(table)); + + thrown.expect(Pipeline.PipelineExecutionException.class); + thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class)); + thrown.expectMessage("No columns to insert"); + p.run().waitUntilFinish(); + } + + @Test + public void testWritingDisplayData() { + HBaseIO.Write write = HBaseIO.write().withTableId("fooTable").withConfiguration(conf); + DisplayData displayData = DisplayData.from(write); + assertThat(displayData, hasDisplayItem("tableId", "fooTable")); + } + + // HBase helper methods + private static void createTable(String tableId) throws Exception { + byte[][] splitKeys = {"4".getBytes(), "8".getBytes(), "C".getBytes()}; + createTable(tableId, COLUMN_FAMILY, splitKeys); + } + + private static void createTable(String tableId, byte[] columnFamily, byte[][] splitKeys) + throws Exception { + TableName tableName = TableName.valueOf(tableId); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDef = new HColumnDescriptor(columnFamily); + desc.addFamily(colDef); + admin.createTable(desc, splitKeys); + } + + /** + * Helper function to create a table and return the rows that it created. + */ + private static void writeData(String tableId, int numRows) throws Exception { + Connection connection = admin.getConnection(); + TableName tableName = TableName.valueOf(tableId); + BufferedMutator mutator = connection.getBufferedMutator(tableName); + List<Mutation> mutations = makeTableData(numRows); + mutator.mutate(mutations); + mutator.flush(); + mutator.close(); + } + + private static List<Mutation> makeTableData(int numRows) { + List<Mutation> mutations = new ArrayList<>(numRows); + for (int i = 0; i < numRows; ++i) { + // We pad values in hex order 0,1, ... ,F,0, ... + String prefix = String.format("%X", i % 16); + // This 21 is to have a key longer than an input + byte[] rowKey = Bytes.toBytes( + StringUtils.leftPad("_" + String.valueOf(i), 21, prefix)); + byte[] value = Bytes.toBytes(String.valueOf(i)); + byte[] valueEmail = Bytes.toBytes(String.valueOf(i) + "@email.com"); + mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_NAME, value)); + mutations.add(new Put(rowKey).addColumn(COLUMN_FAMILY, COLUMN_EMAIL, valueEmail)); + } + return mutations; + } + + private static ResultScanner scanTable(String tableId, Scan scan) throws Exception { + Connection connection = ConnectionFactory.createConnection(conf); + TableName tableName = TableName.valueOf(tableId); + Table table = connection.getTable(tableName); + return table.getScanner(scan); + } + + private static List<Result> readTable(String tableId, Scan scan) throws Exception { + ResultScanner scanner = scanTable(tableId, scan); + List<Result> results = new ArrayList<>(); + for (Result result : scanner) { + results.add(result); + } + scanner.close(); + return results; + } + + // Beam helper methods + /** Helper function to make a single row mutation to be written. */ + private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) { + ByteString rowKey = ByteString.copyFromUtf8(key); + List<Mutation> mutations = new ArrayList<>(); + mutations.add(makeMutation(key, value)); + return KV.of(rowKey, (Iterable<Mutation>) mutations); + } + + + private static Mutation makeMutation(String key, String value) { + ByteString rowKey = ByteString.copyFromUtf8(key); + return new Put(rowKey.toByteArray()) + .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value)) + .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com")); + } + + private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) { + Put put = new Put(key.getBytes()); + List<Mutation> mutations = new ArrayList<>(); + mutations.add(put); + return KV.of(ByteString.copyFromUtf8(key), (Iterable<Mutation>) mutations); + } + + private void runReadTest(HBaseIO.Read read, List<Result> expected) { + final String transformId = read.getTableId() + "_" + read.getKeyRange(); + PCollection<Result> rows = p.apply("Read" + transformId, read); + PAssert.that(rows).containsInAnyOrder(expected); + p.run().waitUntilFinish(); + } + + private void runReadTestLength(HBaseIO.Read read, long numElements) { + final String transformId = read.getTableId() + "_" + read.getKeyRange(); + PCollection<Result> rows = p.apply("Read" + transformId, read); + PAssert.thatSingleton(rows.apply("Count" + transformId, + Count.<Result>globally())).isEqualTo(numElements); + p.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 70ccf9d..b518b70 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -33,14 +33,15 @@ (sources and sinks) to consume and produce data from systems.</description> <modules> + <module>elasticsearch</module> <module>google-cloud-platform</module> + <module>hbase</module> <module>hdfs</module> + <module>jdbc</module> <module>jms</module> <module>kafka</module> <module>kinesis</module> <module>mongodb</module> - <module>jdbc</module> - <module>elasticsearch</module> <module>mqtt</module> </modules> http://git-wip-us.apache.org/repos/asf/beam/blob/89fae457/sdks/java/javadoc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml index d39958d..3f5e8cc 100644 --- a/sdks/java/javadoc/pom.xml +++ b/sdks/java/javadoc/pom.xml @@ -114,6 +114,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-hbase</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hdfs</artifactId> </dependency>
