This is an automated email from the ASF dual-hosted git repository.
mihir6692 pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.2 by this push:
new 0a7e93d PHOENIX-5055 Split mutations batches probably affects
correctness of index data
0a7e93d is described below
commit 0a7e93d4929e3549c4a640b5e08cd6b00a5d4c9e
Author: Jaanai <[email protected]>
AuthorDate: Sat May 11 11:13:38 2019 +0530
PHOENIX-5055 Split mutations batches probably affects correctness of index
data
---
.../apache/phoenix/end2end/MutationStateIT.java | 47 +++++++++++++++++++++-
.../org/apache/phoenix/end2end/QueryMoreIT.java | 6 +--
.../org/apache/phoenix/execute/MutationState.java | 41 ++++++++++++++-----
.../apache/phoenix/execute/MutationStateTest.java | 41 +++++++++++++++++++
4 files changed, 122 insertions(+), 13 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index 7f4d9a4..3f3c314 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -26,6 +26,7 @@ import java.math.BigInteger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.util.Iterator;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -35,6 +36,11 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -447,5 +453,44 @@ public class MutationStateIT extends
ParallelStatsDisabledIT {
stmt.execute();
assertTrue("Mutation state size should decrease", prevEstimatedSize+4
> state.getEstimatedSize());
}
-
+
+ @Test
+ public void testSplitMutationsIntoSameGroupForSingleRow() throws Exception
{
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ Properties props = new Properties();
+ props.put("phoenix.mutate.batchSize", "2");
+ try (PhoenixConnection conn = DriverManager.getConnection(getUrl(),
props).unwrap(PhoenixConnection.class)) {
+ conn.setAutoCommit(false);
+ conn.createStatement().executeUpdate(
+ "CREATE TABLE " + tableName + " ("
+ + "A VARCHAR NOT NULL PRIMARY KEY,"
+ + "B VARCHAR,"
+ + "C VARCHAR,"
+ + "D VARCHAR) COLUMN_ENCODED_BYTES = 0");
+ conn.createStatement().executeUpdate("CREATE INDEX " + indexName +
" on " + tableName + " (C) INCLUDE(D)");
+
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName +
"(A,B,C,D) VALUES ('A2','B2','C2','D2')");
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName +
"(A,B,C,D) VALUES ('A3','B3', 'C3', null)");
+ conn.commit();
+
+ Table htable =
conn.getQueryServices().getTable(Bytes.toBytes(tableName));
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ Iterator<Result> scannerIter = htable.getScanner(scan).iterator();
+ while (scannerIter.hasNext()) {
+ long ts = -1;
+ Result r = scannerIter.next();
+ for (Cell cell : r.listCells()) {
+ if (ts == -1) {
+ ts = cell.getTimestamp();
+ } else {
+ assertEquals("(" + cell.toString() + ") has different
ts", ts, cell.getTimestamp());
+ }
+ }
+ }
+ htable.close();
+ }
+ }
+
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 04272fa..6c8064d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -490,14 +490,14 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
connection.commit();
assertEquals(2L, connection.getMutationState().getBatchCount());
- // set the batch size (rows) to 1
-
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1");
+ // set the batch size (rows) to 2 since three are at least 2 mutations
when updates a single row
+
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "2");
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,
"128");
connection = (PhoenixConnection) DriverManager.getConnection(getUrl(),
connectionProperties);
upsertRows(connection, fullTableName);
connection.commit();
// each row should be in its own batch
- assertEquals(4L, connection.getMutationState().getBatchCount());
+ assertEquals(2L, connection.getMutationState().getBatchCount());
}
private void upsertRows(PhoenixConnection conn, String fullTableName)
throws SQLException {
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 289adac..e54e892 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -102,6 +102,7 @@ import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -1137,34 +1138,56 @@ public class MutationState implements SQLCloseable {
}
/**
- * Split the list of mutations into multiple lists that don't exceed row
and byte thresholds
+ *
+ * Split the list of mutations into multiple lists. since a single row
update can contain multiple mutations,
+ * we only check if the current batch has exceeded the row or size limit
for different rows,
+ * so that mutations for a single row don't end up in different batches.
*
* @param allMutationList
* List of HBase mutations
* @return List of lists of mutations
*/
- public static List<List<Mutation>> getMutationBatchList(long batchSize,
long batchSizeBytes,
- List<Mutation> allMutationList) {
+ public static List<List<Mutation>> getMutationBatchList(long batchSize,
long batchSizeBytes, List<Mutation> allMutationList) {
+ Preconditions.checkArgument(batchSize> 1,
+ "Mutation types are put or delete, for one row all mutations
must be in one batch.");
+ Preconditions.checkArgument(batchSizeBytes > 0, "Batch size must be
larger than 0");
List<List<Mutation>> mutationBatchList = Lists.newArrayList();
List<Mutation> currentList = Lists.newArrayList();
+ List<Mutation> sameRowList = Lists.newArrayList();
long currentBatchSizeBytes = 0L;
- for (Mutation mutation : allMutationList) {
- long mutationSizeBytes =
KeyValueUtil.calculateMutationDiskSize(mutation);
- if (currentList.size() == batchSize || currentBatchSizeBytes +
mutationSizeBytes > batchSizeBytes) {
+ for (int i = 0; i < allMutationList.size(); ) {
+ long sameRowBatchSize = 1L;
+ Mutation mutation = allMutationList.get(i);
+ long sameRowMutationSizeBytes =
KeyValueUtil.calculateMutationDiskSize(mutation);
+ sameRowList.add(mutation);
+ while (i + 1 < allMutationList.size() &&
+ Bytes.compareTo(allMutationList.get(i + 1).getRow(),
mutation.getRow()) == 0) {
+ Mutation sameRowMutation = allMutationList.get(i + 1);
+ sameRowList.add(sameRowMutation);
+ sameRowMutationSizeBytes +=
KeyValueUtil.calculateMutationDiskSize(sameRowMutation);
+ sameRowBatchSize++;
+ i++;
+ }
+
+ if (currentList.size() + sameRowBatchSize > batchSize ||
+ currentBatchSizeBytes + sameRowMutationSizeBytes >
batchSizeBytes) {
if (currentList.size() > 0) {
mutationBatchList.add(currentList);
currentList = Lists.newArrayList();
currentBatchSizeBytes = 0L;
}
}
- currentList.add(mutation);
- currentBatchSizeBytes += mutationSizeBytes;
+
+ currentList.addAll(sameRowList);
+ currentBatchSizeBytes += sameRowMutationSizeBytes;
+ sameRowList.clear();
+ i++;
}
+
if (currentList.size() > 0) {
mutationBatchList.add(currentList);
}
return mutationBatchList;
-
}
public byte[] encodeTransaction() throws SQLException {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index 8553b73..22662b2 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -29,6 +29,9 @@ import java.util.List;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.schema.types.PUnsignedInt;
@@ -36,6 +39,8 @@ import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.Test;
+import com.google.common.collect.ImmutableList;
+
public class MutationStateTest {
@Test
@@ -134,4 +139,40 @@ public class MutationStateTest {
assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(1)))));
}
+
+ @Test
+ public void testGetMutationBatchList() {
+ byte[] r1 = Bytes.toBytes(1);
+ byte[] r2 = Bytes.toBytes(2);
+ byte[] r3 = Bytes.toBytes(3);
+ byte[] r4 = Bytes.toBytes(4);
+ // one put and one delete as a group
+ {
+ List<Mutation> list = ImmutableList.of(new Put(r1), new Put(r2),
new Delete(r2));
+ List<List<Mutation>> batchLists =
MutationState.getMutationBatchList(2, 10, list);
+ assertTrue(batchLists.size() == 2);
+ assertEquals(batchLists.get(0).size(), 1);
+ assertEquals(batchLists.get(1).size(), 2);
+ }
+
+ {
+ List<Mutation> list = ImmutableList.of(new Put(r1), new
Delete(r1), new Put(r2));
+ List<List<Mutation>> batchLists =
MutationState.getMutationBatchList(2, 10, list);
+ assertTrue(batchLists.size() == 2);
+ assertEquals(batchLists.get(0).size(), 2);
+ assertEquals(batchLists.get(1).size(), 1);
+ }
+
+ {
+ List<Mutation> list = ImmutableList.of(new Put(r3), new Put(r1),
new Delete(r1), new Put(r2), new Put(r4), new Delete(r4));
+ List<List<Mutation>> batchLists =
MutationState.getMutationBatchList(2, 10, list);
+ assertTrue(batchLists.size() == 4);
+ assertEquals(batchLists.get(0).size(), 1);
+ assertEquals(batchLists.get(1).size(), 2);
+ assertEquals(batchLists.get(2).size(), 1);
+ assertEquals(batchLists.get(3).size(), 2);
+ }
+
+ }
+
}