Repository: phoenix
Updated Branches:
  refs/heads/master 0caecfd90 -> e2ec76559


PHOENIX-3784 Chunk commit data using lower of byte-based and row-count limits


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2ec7655
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2ec7655
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2ec7655

Branch: refs/heads/master
Commit: e2ec76559b1d36fee7b696d7ee2989059e2bbe8c
Parents: 0caecfd
Author: Thomas <[email protected]>
Authored: Thu May 25 16:18:23 2017 -0700
Committer: Thomas <[email protected]>
Committed: Fri May 26 10:58:42 2017 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/QueryMoreIT.java | 36 +++++++++++++-------
 .../UngroupedAggregateRegionObserver.java       | 23 +++++++------
 .../apache/phoenix/execute/MutationState.java   | 16 +++++----
 .../org/apache/phoenix/query/QueryServices.java |  1 -
 .../phoenix/query/QueryServicesOptions.java     |  5 ++-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |  1 -
 .../apache/phoenix/jdbc/PhoenixDriverTest.java  |  2 +-
 7 files changed, 48 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index a2dab16..bfccb63 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -28,6 +28,7 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -477,6 +478,7 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
     @Test
     public void testMutationBatch() throws Exception {
         Properties connectionProperties = new Properties();
+        
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "10");
         
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, 
"1024");
         PhoenixConnection connection = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), connectionProperties);
         String fullTableName = generateUniqueName();
@@ -492,18 +494,28 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
                 "    )\n" +
                 ") MULTI_TENANT=TRUE");
         }
-        PreparedStatement stmt = connection.prepareStatement("upsert into " + 
fullTableName +
-            " (organization_id, entity_id, score) values (?,?,?)");
-        try {
-            for (int i = 0; i < 4; i++) {
-                stmt.setString(1, "AAAA" + i);
-                stmt.setString(2, "BBBB" + i);
-                stmt.setInt(3, 1);
-                stmt.execute();
-            }
-            connection.commit();
-        } catch (IllegalArgumentException expected) {}
-
+        upsertRows(connection, fullTableName);
+        connection.commit();
         assertEquals(2L, connection.getMutationState().getBatchCount());
+        
+        // set the batch size (rows) to 1 
+        
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1");
+        
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, 
"1024");
+        connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), 
connectionProperties);
+        upsertRows(connection, fullTableName);
+        connection.commit();
+        // each row should be in its own batch
+        assertEquals(4L, connection.getMutationState().getBatchCount());
+    }
+
+    private void upsertRows(PhoenixConnection conn, String fullTableName) 
throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement("upsert into " + 
fullTableName +
+                " (organization_id, entity_id, score) values (?,?,?)");
+        for (int i = 0; i < 4; i++) {
+            stmt.setString(1, "AAAA" + i);
+            stmt.setString(2, "BBBB" + i);
+            stmt.setInt(3, 1);
+            stmt.execute();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a056807..70e74e9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -88,7 +88,6 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
@@ -472,7 +471,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
         }
-        long rowCount = 0;
+        int rowCount = 0;
         final RegionScanner innerScanner = theScanner;
         boolean useIndexProto = true;
         byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
@@ -709,14 +708,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 }
                             }
                         }
-                        if (readyToCommit(mutations, maxBatchSize, 
maxBatchSizeBytes)) {
+                        if (readyToCommit(rowCount, mutations.heapSize(), 
maxBatchSize, maxBatchSizeBytes)) {
                             commit(region, mutations, indexUUID, 
blockingMemStoreSize, indexMaintainersPtr, txState,
                                     areMutationInSameRegion, targetHTable, 
useIndexProto);
                             mutations.clear();
                         }
                         // Commit in batches based on 
UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
-                        if (readyToCommit(indexMutations, maxBatchSize, 
maxBatchSizeBytes)) {
+                        if (readyToCommit(rowCount, indexMutations.heapSize(), 
maxBatchSize, maxBatchSizeBytes)) {
                             commitBatch(region, indexMutations, null, 
blockingMemStoreSize, null, txState,
                                     useIndexProto);
                             indexMutations.clear();
@@ -800,9 +799,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         }
     }
 
-    private boolean readyToCommit(MutationList mutations, int maxBatchSize, 
long maxBatchSizeBytes) {
-        return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > 
maxBatchSize)
-                || (maxBatchSizeBytes > 0 && mutations.heapSize() > 
maxBatchSizeBytes);
+    private boolean readyToCommit(int rowCount, long mutationSize, int 
maxBatchSize, long maxBatchSizeBytes) {
+        return maxBatchSize > 0 && rowCount > maxBatchSize
+                || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
     }
 
     @Override
@@ -856,10 +855,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
         boolean hasMore;
-        long rowCount = 0;
+        int rowCount = 0;
         try {
-            int batchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            List<Mutation> mutations = 
Lists.newArrayListWithExpectedSize(batchSize);
+            int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            long maxBatchSizeBytes = 
config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            MutationList mutations = new MutationList(maxBatchSize);
             region.startRegionOperation();
             byte[] uuidValue = ServerCacheClient.generateId();
             synchronized (innerScanner) {
@@ -893,7 +894,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (mutations.size() >= batchSize) {
+                        if (readyToCommit(rowCount, mutations.heapSize(), 
maxBatchSize, maxBatchSizeBytes)) {
                             region.batchMutate(mutations.toArray(new 
Mutation[mutations.size()]), HConstants.NO_NONCE,
                                     HConstants.NO_NONCE);
                             uuidValue = ServerCacheClient.generateId();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 6144c7f..37ab7a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -127,7 +127,8 @@ public class MutationState implements SQLCloseable {
     
     private final PhoenixConnection connection;
     private final long maxSize;
-    private final long maxSizeBytes;
+    private final long batchSize;
+    private final long batchSizeBytes;
     private long batchCount = 0L;
     private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
mutations;
     private final List<TransactionAware> txAwares;
@@ -174,7 +175,8 @@ public class MutationState implements SQLCloseable {
             Transaction tx, TransactionContext txContext) {
         this.maxSize = maxSize;
         this.connection = connection;
-        this.maxSizeBytes = connection.getMutateBatchSizeBytes();
+        this.batchSize = connection.getMutateBatchSize();
+        this.batchSizeBytes = connection.getMutateBatchSizeBytes();
         this.mutations = mutations;
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
@@ -860,8 +862,8 @@ public class MutationState implements SQLCloseable {
         }
     }
 
-    public long getMaxSizeBytes() {
-        return maxSizeBytes;
+    public long getBatchSizeBytes() {
+        return batchSizeBytes;
     }
 
     public long getBatchCount() {
@@ -1095,7 +1097,7 @@ public class MutationState implements SQLCloseable {
                         
                         long startTime = System.currentTimeMillis();
                         child.addTimelineAnnotation("Attempt " + retryCount);
-                        List<List<Mutation>> mutationBatchList = 
getMutationBatchList(maxSize, maxSizeBytes, mutationList);
+                        List<List<Mutation>> mutationBatchList = 
getMutationBatchList(batchSize, batchSizeBytes, mutationList);
                         for (List<Mutation> mutationBatch : mutationBatchList) 
{
                             hTable.batch(mutationBatch);
                             batchCount++;
@@ -1169,13 +1171,13 @@ public class MutationState implements SQLCloseable {
      * @param allMutationList List of HBase mutations
      * @return List of lists of mutations
      */
-    public static List<List<Mutation>> getMutationBatchList(long maxSize, long 
maxSizeBytes, List<Mutation> allMutationList) {
+    public static List<List<Mutation>> getMutationBatchList(long batchSize, 
long batchSizeBytes, List<Mutation> allMutationList) {
         List<List<Mutation>> mutationBatchList = Lists.newArrayList();
         List<Mutation> currentList = Lists.newArrayList();
         long currentBatchSizeBytes = 0L;
         for (Mutation mutation : allMutationList) {
             long mutationSizeBytes = mutation.heapSize();
-            if (currentList.size() == maxSize || currentBatchSizeBytes + 
mutationSizeBytes > maxSizeBytes) {
+            if (currentList.size() == batchSize || currentBatchSizeBytes + 
mutationSizeBytes > batchSizeBytes) {
                 if (currentList.size() > 0) {
                     mutationBatchList.add(currentList);
                     currentList = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 331b596..6364ee3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -87,7 +87,6 @@ public interface QueryServices extends SQLCloseable {
     public static final String SCAN_CACHE_SIZE_ATTRIB = 
"hbase.client.scanner.caching";
     public static final String MAX_MUTATION_SIZE_ATTRIB = 
"phoenix.mutate.maxSize";
 
-    @Deprecated //USE MUTATE_BATCH_SIZE_BYTES_ATTRIB instead
     public static final String MUTATE_BATCH_SIZE_ATTRIB = 
"phoenix.mutate.batchSize";
     public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = 
"phoenix.mutate.batchSizeBytes";
     public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = 
"phoenix.coprocessor.maxServerCacheTimeToLiveMs";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b1d8a7e..ae7d7aa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -139,10 +139,9 @@ public class QueryServicesOptions {
     public static final int DEFAULT_TRACING_BATCH_SIZE = 100;
     public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000;
 
-    @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES
     public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for 
UPSERT SELECT and DELETE
-    //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 10MB
-    public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 134217728;
+    //Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
+    public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152;
        // The only downside of it being out-of-sync is that the 
parallelization of the scan won't be as balanced as it could be.
     public static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 30000; 
// 30 sec (with no activity)
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
index 76d454b..7715705 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java
@@ -138,7 +138,6 @@ public class JDBCUtil {
         return (scnStr == null ? null : Long.parseLong(scnStr));
     }
 
-    @Deprecated // use getMutateBatchSizeBytes
     public static int getMutateBatchSize(String url, Properties info, 
ReadOnlyProps props) throws SQLException {
         String batchSizeStr = findProperty(url, info, 
PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
         return (batchSizeStr == null ? 
props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : 
Integer.parseInt(batchSizeStr));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2ec7655/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index c87c2db..d8f9df6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -84,7 +84,7 @@ public class PhoenixDriverTest extends 
BaseConnectionlessQueryTest {
         
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,"100");
         PhoenixConnection connection = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), connectionProperties);
         assertEquals(100L, connection.getMutateBatchSizeBytes());
-        assertEquals(100L, connection.getMutationState().getMaxSizeBytes());
+        assertEquals(100L, connection.getMutationState().getBatchSizeBytes());
     }
     
     @Test

Reply via email to