[
https://issues.apache.org/jira/browse/BEAM-4019?focusedWorklogId=92208&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92208
]
ASF GitHub Bot logged work on BEAM-4019:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Apr/18 18:36
Start Date: 18/Apr/18 18:36
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5081: [BEAM-4019] Refactor
HBaseIO splitting to produce ByteKeyRange objects
URL: https://github.com/apache/beam/pull/5081
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index bcdaefa1498..5b8f00439d6 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -22,13 +22,10 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
@@ -46,11 +43,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
@@ -58,13 +51,11 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -228,8 +219,11 @@ private Read(
} catch (IOException e) {
LOG.warn("Error checking whether table {} exists; proceeding.",
tableId, e);
}
- HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes
*/);
- return
input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+ return input
+ .getPipeline()
+ .apply(
+ org.apache.beam.sdk.io.Read.from(
+ new HBaseSource(this, null /* estimatedSizeBytes */)));
}
@Override
@@ -294,7 +288,11 @@ HBaseSource withEndKey(ByteKey endKey) throws IOException {
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws
Exception {
if (estimatedSizeBytes == null) {
- estimatedSizeBytes = estimateSizeBytes();
+ try (Connection connection =
+
ConnectionFactory.createConnection(read.serializableConfiguration.get())) {
+ estimatedSizeBytes =
+ HBaseUtils.estimateSizeBytes(connection, read.tableId,
read.serializableScan.get());
+ }
LOG.debug(
"Estimated size {} bytes for table {} and scan {}",
estimatedSizeBytes,
@@ -304,111 +302,6 @@ public long getEstimatedSizeBytes(PipelineOptions
pipelineOptions) throws Except
return estimatedSizeBytes;
}
- /**
- * This estimates the real size, it can be the compressed size depending
on the HBase
- * configuration.
- */
- private long estimateSizeBytes() throws Exception {
- // This code is based on RegionSizeCalculator in hbase-server
- long estimatedSizeBytes = 0L;
- Configuration configuration = this.read.serializableConfiguration.get();
- try (Connection connection =
ConnectionFactory.createConnection(configuration)) {
- // filter regions for the given table/scan
- List<HRegionLocation> regionLocations = getRegionLocations(connection);
-
- // builds set of regions who are part of the table scan
- Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
- for (HRegionLocation regionLocation : regionLocations) {
- tableRegions.add(regionLocation.getRegionInfo().getRegionName());
- }
-
- // calculate estimated size for the regions
- Admin admin = connection.getAdmin();
- ClusterStatus clusterStatus = admin.getClusterStatus();
- Collection<ServerName> servers = clusterStatus.getServers();
- for (ServerName serverName : servers) {
- ServerLoad serverLoad = clusterStatus.getLoad(serverName);
- for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
- byte[] regionId = regionLoad.getName();
- if (tableRegions.contains(regionId)) {
- long regionSizeBytes = regionLoad.getStorefileSizeMB() *
1_048_576L;
- estimatedSizeBytes += regionSizeBytes;
- }
- }
- }
- }
- return estimatedSizeBytes;
- }
-
- private List<HRegionLocation> getRegionLocations(Connection connection)
throws Exception {
- final Scan scan = read.serializableScan.get();
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
-
- final List<HRegionLocation> regionLocations = new ArrayList<>();
-
- final boolean scanWithNoLowerBound = startRow.length == 0;
- final boolean scanWithNoUpperBound = stopRow.length == 0;
-
- TableName tableName = TableName.valueOf(read.tableId);
- RegionLocator regionLocator = connection.getRegionLocator(tableName);
- List<HRegionLocation> tableRegionInfos =
regionLocator.getAllRegionLocations();
- for (HRegionLocation regionLocation : tableRegionInfos) {
- final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
- final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
- boolean isLastRegion = endKey.length == 0;
- // filters regions who are part of the scan
- if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow,
endKey) < 0)
- && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) >
0)) {
- regionLocations.add(regionLocation);
- }
- }
-
- return regionLocations;
- }
-
- private List<HBaseSource> splitBasedOnRegions(
- List<HRegionLocation> regionLocations, int numSplits) throws Exception
{
- final Scan scan = read.serializableScan.get();
- byte[] startRow = scan.getStartRow();
- byte[] stopRow = scan.getStopRow();
-
- final List<HBaseSource> sources = new ArrayList<>(numSplits);
- final boolean scanWithNoLowerBound = startRow.length == 0;
- final boolean scanWithNoUpperBound = stopRow.length == 0;
-
- for (HRegionLocation regionLocation : regionLocations) {
- final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
- final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
- boolean isLastRegion = endKey.length == 0;
- String host = regionLocation.getHostnamePort();
-
- final byte[] splitStart =
- (scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0)
- ? startKey
- : startRow;
- final byte[] splitStop =
- (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) &&
!isLastRegion
- ? endKey
- : stopRow;
-
- LOG.debug(
- "{} {} {} {} {}",
- sources.size(),
- host,
- read.tableId,
- Bytes.toString(splitStart),
- Bytes.toString(splitStop));
-
- // We need to create a new copy of the scan and read to add the new
ranges
- Scan newScan = new
Scan(scan).setStartRow(splitStart).setStopRow(splitStop);
- Read newRead =
- new Read(read.serializableConfiguration, read.tableId, new
SerializableScan(newScan));
- sources.add(new HBaseSource(newRead, estimatedSizeBytes));
- }
- return sources;
- }
-
@Override
public List<? extends BoundedSource<Result>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception
{
@@ -420,17 +313,38 @@ private long estimateSizeBytes() throws Exception {
}
try (Connection connection =
ConnectionFactory.createConnection(read.getConfiguration())) {
- List<HRegionLocation> regionLocations = getRegionLocations(connection);
- int realNumSplits = numSplits < regionLocations.size() ?
regionLocations.size() : numSplits;
- LOG.debug("Suggested {} bundle(s) based on size", numSplits);
- LOG.debug("Suggested {} bundle(s) based on number of regions",
regionLocations.size());
- final List<HBaseSource> sources = splitBasedOnRegions(regionLocations,
realNumSplits);
- LOG.debug("Split into {} bundle(s)", sources.size());
- if (numSplits >= 1) {
+ List<HRegionLocation> regionLocations =
+ HBaseUtils.getRegionLocations(connection, read.tableId,
read.serializableScan.get());
+ LOG.debug("Suggested {} source(s) based on size", numSplits);
+ LOG.debug("Suggested {} source(s) based on number of regions",
regionLocations.size());
+
+ List<ByteKeyRange> ranges =
+ HBaseUtils.getRanges(
+ regionLocations, read.tableId, read.serializableScan.get());
+ final int numSources = ranges.size();
+ LOG.debug("Spliting into {} source(s)", numSources);
+ if (numSources > 0) {
+ List<HBaseSource> sources = new ArrayList<>(numSources);
+ for (int i = 0; i < numSources; i++) {
+ final ByteKeyRange range = ranges.get(i);
+ LOG.debug("Range {}: {} - {}", i, range.getStartKey(),
range.getEndKey());
+ // We create a new copy of the scan to read from the new ranges
+ sources.add(
+ new HBaseSource(
+ new Read(
+ read.serializableConfiguration,
+ read.tableId,
+ new SerializableScan(
+ new Scan(read.serializableScan.get())
+ .setStartRow(range.getStartKey().getBytes())
+ .setStopRow(range.getEndKey().getBytes()))),
+ estimatedSizeBytes));
+ }
return sources;
}
- return Collections.singletonList(this);
}
+
+ return Collections.singletonList(this);
}
@Override
diff --git
a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseUtils.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseUtils.java
new file mode 100644
index 00000000000..0cf2c504e7a
--- /dev/null
+++
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** Utils to interact with an HBase cluster and get information on
tables/regions. */
+class HBaseUtils {
+
+ /**
+ * Estimates the size in bytes that a scan will cover for a given table
based on HBase store file
+ * information. The size can vary between calls because the data can be
compressed based on the
+ * HBase configuration.
+ */
+ static long estimateSizeBytes(Connection connection, String tableId, Scan
scan) throws Exception {
+ // This code is based on RegionSizeCalculator in hbase-server
+ long estimatedSizeBytes = 0L;
+ List<HRegionLocation> regionLocations = getRegionLocations(connection,
tableId, scan);
+
+ // builds set of regions who are part of the table scan
+ Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+ for (HRegionLocation regionLocation : regionLocations) {
+ tableRegions.add(regionLocation.getRegionInfo().getRegionName());
+ }
+
+ // calculate estimated size for the regions
+ Admin admin = connection.getAdmin();
+ ClusterStatus clusterStatus = admin.getClusterStatus();
+ Collection<ServerName> servers = clusterStatus.getServers();
+ for (ServerName serverName : servers) {
+ ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+ for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+ byte[] regionId = regionLoad.getName();
+ if (tableRegions.contains(regionId)) {
+ long regionSizeBytes = regionLoad.getStorefileSizeMB() * 1_048_576L;
+ estimatedSizeBytes += regionSizeBytes;
+ }
+ }
+ }
+
+ return estimatedSizeBytes;
+ }
+
+ /** Returns a list of region locations for a given table and scan. */
+ static List<HRegionLocation> getRegionLocations(Connection connection,
String tableId, Scan scan)
+ throws Exception {
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+
+ final List<HRegionLocation> regionLocations = new ArrayList<>();
+
+ final boolean scanWithNoLowerBound = startRow.length == 0;
+ final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+ TableName tableName = TableName.valueOf(tableId);
+ RegionLocator regionLocator = connection.getRegionLocator(tableName);
+ List<HRegionLocation> tableRegionInfos =
regionLocator.getAllRegionLocations();
+ for (HRegionLocation regionLocation : tableRegionInfos) {
+ final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
+ final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
+ boolean isLastRegion = endKey.length == 0;
+ // filters regions who are part of the scan
+ if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow,
endKey) < 0)
+ && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0))
{
+ regionLocations.add(regionLocation);
+ }
+ }
+
+ return regionLocations;
+ }
+
+ /** Returns the list of ranges intersecting a list of regions for the given
table and scan. */
+ static List<ByteKeyRange> getRanges(
+ List<HRegionLocation> regionLocations, String tableId, Scan scan) {
+ byte[] startRow = scan.getStartRow();
+ byte[] stopRow = scan.getStopRow();
+
+ final List<ByteKeyRange> splits = new ArrayList<>();
+ final boolean scanWithNoLowerBound = startRow.length == 0;
+ final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+ for (HRegionLocation regionLocation : regionLocations) {
+ final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
+ final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
+ boolean isLastRegion = endKey.length == 0;
+
+ final byte[] splitStart =
+ (scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0) ?
startKey : startRow;
+ final byte[] splitStop =
+ (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) &&
!isLastRegion
+ ? endKey
+ : stopRow;
+ splits.add(ByteKeyRange.of(ByteKey.copyFrom(splitStart),
ByteKey.copyFrom(splitStop)));
+ }
+ return splits;
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 92208)
Time Spent: 1h 50m (was: 1h 40m)
> Refactor HBaseIO splitting to produce ByteKeyRange objects
> ----------------------------------------------------------
>
> Key: BEAM-4019
> URL: https://issues.apache.org/jira/browse/BEAM-4019
> Project: Beam
> Issue Type: Improvement
> Components: io-java-hbase
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Minor
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> This allows to reuse the splitting logic for a future SDF-based
> implementation by reusing it as part of the @SplitRestriction method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)