Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 b344d4070 -> 7f3e7b02e
PHOENIX-3784 Chunk commit data using lower of byte-based and row-count limits Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7f3e7b02 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7f3e7b02 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7f3e7b02 Branch: refs/heads/4.x-HBase-1.1 Commit: 7f3e7b02e74b7ec9fdc179f3bad6b613aab3055b Parents: b344d40 Author: Thomas <[email protected]> Authored: Thu May 25 16:18:23 2017 -0700 Committer: Thomas <[email protected]> Committed: Fri May 26 11:00:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/QueryMoreIT.java | 36 +++++++++++++------- .../UngroupedAggregateRegionObserver.java | 23 +++++++------ .../apache/phoenix/execute/MutationState.java | 16 +++++---- .../org/apache/phoenix/query/QueryServices.java | 1 - .../phoenix/query/QueryServicesOptions.java | 5 ++- .../java/org/apache/phoenix/util/JDBCUtil.java | 1 - .../apache/phoenix/jdbc/PhoenixDriverTest.java | 2 +- 7 files changed, 48 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index a2dab16..bfccb63 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -28,6 +28,7 @@ import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; @@ -477,6 +478,7 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { @Test public void testMutationBatch() throws Exception { Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "10"); connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024"); PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); String fullTableName = generateUniqueName(); @@ -492,18 +494,28 @@ public class QueryMoreIT extends ParallelStatsDisabledIT { " )\n" + ") MULTI_TENANT=TRUE"); } - PreparedStatement stmt = connection.prepareStatement("upsert into " + fullTableName + - " (organization_id, entity_id, score) values (?,?,?)"); - try { - for (int i = 0; i < 4; i++) { - stmt.setString(1, "AAAA" + i); - stmt.setString(2, "BBBB" + i); - stmt.setInt(3, 1); - stmt.execute(); - } - connection.commit(); - } catch (IllegalArgumentException expected) {} - + upsertRows(connection, fullTableName); + connection.commit(); assertEquals(2L, connection.getMutationState().getBatchCount()); + + // set the batch size (rows) to 1 + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1"); + connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024"); + connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + upsertRows(connection, fullTableName); + connection.commit(); + // each row should be in its own batch + assertEquals(4L, connection.getMutationState().getBatchCount()); + } + + private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { + PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + for (int i = 0; i < 4; i++) { + stmt.setString(1, "AAAA" + i); + stmt.setString(2, "BBBB" + i); + stmt.setInt(3, 1); + stmt.execute(); + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a056807..70e74e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -88,7 +88,6 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; @@ -472,7 +471,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } - long rowCount = 0; + int rowCount = 0; final RegionScanner innerScanner = theScanner; boolean useIndexProto = true; byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); @@ -709,14 +708,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } } - if (readyToCommit(mutations, maxBatchSize, maxBatchSizeBytes)) { + if (readyToCommit(rowCount, mutations.heapSize(), maxBatchSize, maxBatchSizeBytes)) { commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, areMutationInSameRegion, targetHTable, useIndexProto); mutations.clear(); } // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config - if (readyToCommit(indexMutations, maxBatchSize, maxBatchSizeBytes)) { + if (readyToCommit(rowCount, indexMutations.heapSize(), maxBatchSize, maxBatchSizeBytes)) { commitBatch(region, indexMutations, null, blockingMemStoreSize, null, txState, useIndexProto); indexMutations.clear(); @@ -800,9 +799,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) { - return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize) - || (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes); + private boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long maxBatchSizeBytes) { + return maxBatchSize > 0 && rowCount > maxBatchSize + || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes); } @Override @@ -856,10 +855,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); } boolean hasMore; - long rowCount = 0; + int rowCount = 0; try { - int batchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize); + int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + MutationList mutations = new MutationList(maxBatchSize); region.startRegionOperation(); byte[] uuidValue = ServerCacheClient.generateId(); synchronized (innerScanner) { @@ -893,7 +894,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver del.addDeleteMarker(cell); } } - if (mutations.size() >= batchSize) { + if (readyToCommit(rowCount, mutations.heapSize(), maxBatchSize, maxBatchSizeBytes)) { region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE); uuidValue = ServerCacheClient.generateId(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 6144c7f..37ab7a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -127,7 +127,8 @@ public class MutationState implements SQLCloseable { private final PhoenixConnection connection; private final long maxSize; - private final long maxSizeBytes; + private final long batchSize; + private final long batchSizeBytes; private long batchCount = 0L; private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; private final List<TransactionAware> txAwares; @@ -174,7 +175,8 @@ public class MutationState implements SQLCloseable { Transaction tx, TransactionContext txContext) { this.maxSize = maxSize; this.connection = connection; - this.maxSizeBytes = connection.getMutateBatchSizeBytes(); + this.batchSize = connection.getMutateBatchSize(); + this.batchSizeBytes = connection.getMutateBatchSizeBytes(); this.mutations = mutations; boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() @@ -860,8 +862,8 @@ public class MutationState implements SQLCloseable { } } - public long getMaxSizeBytes() { - return maxSizeBytes; + public long getBatchSizeBytes() { + return batchSizeBytes; } public long getBatchCount() { @@ -1095,7 +1097,7 @@ public class MutationState implements SQLCloseable { long startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); - List<List<Mutation>> mutationBatchList = getMutationBatchList(maxSize, maxSizeBytes, mutationList); + List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes, mutationList); for (List<Mutation> mutationBatch : mutationBatchList) { hTable.batch(mutationBatch); batchCount++; @@ -1169,13 +1171,13 @@ public class MutationState implements SQLCloseable { * @param allMutationList List of HBase mutations * @return List of lists of mutations */ - public static List<List<Mutation>> getMutationBatchList(long maxSize, long maxSizeBytes, List<Mutation> allMutationList) { + public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, List<Mutation> allMutationList) { List<List<Mutation>> mutationBatchList = Lists.newArrayList(); List<Mutation> currentList = Lists.newArrayList(); long currentBatchSizeBytes = 0L; for (Mutation mutation : allMutationList) { long mutationSizeBytes = mutation.heapSize(); - if (currentList.size() == maxSize || currentBatchSizeBytes + mutationSizeBytes > maxSizeBytes) { + if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) { if (currentList.size() > 0) { mutationBatchList.add(currentList); currentList = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 331b596..6364ee3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -87,7 +87,6 @@ public interface QueryServices extends SQLCloseable { public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; - @Deprecated //USE MUTATE_BATCH_SIZE_BYTES_ATTRIB instead public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes"; public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index b1d8a7e..ae7d7aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -139,10 +139,9 @@ public class QueryServicesOptions { public static final int DEFAULT_TRACING_BATCH_SIZE = 100; public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000; - @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE - //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 10MB - public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 134217728; + //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB + public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152; // The only downside of it being out-of-sync is that the parallelization of the scan won't be as balanced as it could be. public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; // 30 sec (with no activity) public static final int DEFAULT_SCAN_CACHE_SIZE = 1000; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index 76d454b..7715705 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -138,7 +138,6 @@ public class JDBCUtil { return (scnStr == null ? null : Long.parseLong(scnStr)); } - @Deprecated // use getMutateBatchSizeBytes public static int getMutateBatchSize(String url, Properties info, ReadOnlyProps props) throws SQLException { String batchSizeStr = findProperty(url, info, PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB); return (batchSizeStr == null ? props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : Integer.parseInt(batchSizeStr)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7f3e7b02/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java index c87c2db..d8f9df6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java @@ -84,7 +84,7 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest { connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,"100"); PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); assertEquals(100L, connection.getMutateBatchSizeBytes()); - assertEquals(100L, connection.getMutationState().getMaxSizeBytes()); + assertEquals(100L, connection.getMutationState().getBatchSizeBytes()); } @Test
