[
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552339#comment-17552339
]
Kadir Ozdemir edited comment on PHOENIX-6677 at 6/9/22 6:15 PM:
----------------------------------------------------------------
[~stoty], Here are the cluster level config params and their defaults:
{code:java}
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB =
"phoenix.mutate.maxSizeBytes";
public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 Mb
public static final String MUTATE_BATCH_SIZE_ATTRIB =
"phoenix.mutate.batchSize";
public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB =
"phoenix.mutate.batchSizeBytes";
public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for
UPSERT SELECT and DELETE
//Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152; {code}
The first set is at the connection level. The second set is at the statement
level. As you can see, the statement level parameter default value for the
mutation batch size (the number of rows) is 100. Even if we issue sendBatch and
then commit say at every 2000 rows, Phoenix breaks them into 100 row batches by
default.
Please see the following methods in JDBCUtil
{code:java}
public static int getMutateBatchSize(String url, Properties info,
ReadOnlyProps props) throws SQLException {
String batchSizeStr = findProperty(url, info,
PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
return (batchSizeStr == null ?
props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) :
Integer.parseInt(batchSizeStr));
}
public static long getMutateBatchSizeBytes(String url, Properties info,
ReadOnlyProps props) throws SQLException {
String batchSizeStr = findProperty(url, info,
PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB);
return (batchSizeStr == null ?
props.getLong(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES) :
Long.parseLong(batchSizeStr));
}
{code}
You can see that it checks the per connection attributes first and then if it
is not given any then uses cluster level connection params. The above code
allows the user to override the cluster level parameters using connection
properties.
The following comments in the code are confusing.
{code:java}
/**
* Use this connection property to control the number of rows that are
* batched together on an UPSERT INTO table1... SELECT ... FROM table2.
* It's only used when autoCommit is true and your source table is
* different than your target table or your SELECT statement has a
* GROUP BY clause.
*/
public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
/**
* Use this connection property to control the number of bytes that are
* batched together on an UPSERT INTO table1... SELECT ... FROM table2.
* It's only used when autoCommit is true and your source table is
* different than your target table or your SELECT statement has a
* GROUP BY clause. Overrides the value of UpsertBatchSize.
*/
public final static String UPSERT_BATCH_SIZE_BYTES_ATTRIB =
"UpsertBatchSizeBytes"; {code}
I used the following code with auto connection disabled. UPSERT batches worked
as expected (i.e., Phoenix passed 2000 row batches to HBase)
{code:java}
Properties props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_BYTES_ATTRIB, "2000000");
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, "2000");
try (Connection conn = DriverManager.getConnection(getUrl(), props)) { {code}
It is necessary to do more testing and investigation on this to sort things
out. I agree with you that default values (100 and 2097152) are too low and
should be increased.
was (Author: kadir):
[~stoty], Here are the cluster level config params and their defaults:
{code:java}
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB =
"phoenix.mutate.maxSizeBytes";
public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 Mb
public static final String MUTATE_BATCH_SIZE_ATTRIB =
"phoenix.mutate.batchSize";
public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB =
"phoenix.mutate.batchSizeBytes";
public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for
UPSERT SELECT and DELETE
//Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152; {code}
The first set is at connection level. The second set is at the statement level.
As you can see, the statement level parameter default value for mutation batch
size (the number of rows) is 100. Even if we issue sendBatch and then commit
say at every 2000 rows, Phoenix breaks them into 100 row batches by default.
Please see the following methods in JDBCUtil
{code:java}
public static int getMutateBatchSize(String url, Properties info,
ReadOnlyProps props) throws SQLException {
String batchSizeStr = findProperty(url, info,
PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
return (batchSizeStr == null ?
props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) :
Integer.parseInt(batchSizeStr));
}
public static long getMutateBatchSizeBytes(String url, Properties info,
ReadOnlyProps props) throws SQLException {
String batchSizeStr = findProperty(url, info,
PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB);
return (batchSizeStr == null ?
props.getLong(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES) :
Long.parseLong(batchSizeStr));
}
{code}
You can see that it checks the per connection attributes first and then if it
is not given then uses cluster level connection params. The above code allows
to override the cluster level parameters using connection properties.
The following comments in the code are confusing.
{code:java}
/**
* Use this connection property to control the number of rows that are
* batched together on an UPSERT INTO table1... SELECT ... FROM table2.
* It's only used when autoCommit is true and your source table is
* different than your target table or your SELECT statement has a
* GROUP BY clause.
*/
public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";
/**
* Use this connection property to control the number of bytes that are
* batched together on an UPSERT INTO table1... SELECT ... FROM table2.
* It's only used when autoCommit is true and your source table is
* different than your target table or your SELECT statement has a
* GROUP BY clause. Overrides the value of UpsertBatchSize.
*/
public final static String UPSERT_BATCH_SIZE_BYTES_ATTRIB =
"UpsertBatchSizeBytes"; {code}
I used the following code with auto connection disabled and UPSERT batches
worked as expected (i.e., Phoenix passed 2000 row batches to HBase)
{code:java}
Properties props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_BYTES_ATTRIB, "2000000");
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, "2000");
try (Connection conn = DriverManager.getConnection(getUrl(), props)) { {code}
It is necessary to do more testing and investigation on this to sort things
out. I agree with you is that default values (100 and 2097152) are too low and
should be increased.
> Parallelism within a batch of mutations
> ----------------------------------------
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
> Issue Type: Improvement
> Reporter: Kadir OZDEMIR
> Priority: Major
>
> Currently, Phoenix client simply passes the batches of row mutations from the
> application to HBase client without any parallelism or intelligent grouping
> (except grouping mutations for the same row).
> Assume that the application creates batches 10000 row mutations for a given
> table. Phoenix client divides these rows based on their arrival order into
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e.,
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch
> at a time (i.e., serially). HBase client further divides a given batch of
> rows into smaller batches based on their regions. This means that a large
> batch created by the application is divided into many tiny batches and
> executed mostly serially. For slated tables, this will result in even smaller
> batches.
> We can improve the current implementation greatly if we group the rows of the
> batch prepared by the application into sub batches based on table region
> boundaries and then execute these batches in parallel.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)