This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 5cb18429c7 PHOENIX-6834 Use Pooled HConnection for Server Side Upsert
Select
5cb18429c7 is described below
commit 5cb18429c770d734bf4452c40f0ce1cf70719210
Author: Istvan Toth <[email protected]>
AuthorDate: Wed Nov 23 17:31:07 2022 +0100
PHOENIX-6834 Use Pooled HConnection for Server Side Upsert Select
---
.../UngroupedAggregateRegionScanner.java | 41 +++++++++++-----------
1 file changed, 21 insertions(+), 20 deletions(-)
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index e8ec6d758e..bd0bb1e054 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -34,9 +34,9 @@ import static
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
-import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
import static org.apache.phoenix.util.ScanUtil.getPageSizeMsForRegionScanner;
import static org.apache.phoenix.util.ScanUtil.isDummy;
+import static org.apache.phoenix.util.WALAnnotationUtil.annotateMutation;
import java.io.IOException;
import java.sql.SQLException;
@@ -45,18 +45,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
-import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -69,6 +63,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.exception.DataExceedsCapacityException;
@@ -77,9 +72,13 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.query.QueryConstants;
@@ -102,20 +101,18 @@ import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PFloat;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
+import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
import org.apache.phoenix.transaction.PhoenixTransactionProvider;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.StringUtil;
@@ -164,7 +161,7 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
private boolean incrScanRefCount = false;
private byte[] indexMaintainersPtr;
private boolean useIndexProto;
- private Connection targetHConn = null;
+ private PhoenixConnection targetPConn = null;
public UngroupedAggregateRegionScanner(final
ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner innerScanner,
final Region region, final Scan scan,
@@ -231,9 +228,13 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
if (upsertSelectTable != null) {
isUpsert = true;
projectedTable = deserializeTable(upsertSelectTable);
- targetHConn =
ConnectionFactory.createConnection(ungroupedAggregateRegionObserver.getUpsertSelectConfig());
- targetHTable = targetHConn.getTable(
-
TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
+ targetPConn =
+ ((PhoenixConnection) QueryUtil.getConnectionOnServer(
+
ungroupedAggregateRegionObserver.getUpsertSelectConfig()));
+ targetHTable =
+ targetPConn.getQueryServices()
+
.getTable(projectedTable.getPhysicalName().getBytes());
+ // TODO Can't we just close the PhoenixConnection immediately here
?
selectExpressions =
deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
values = new byte[projectedTable.getPKColumns().size()][];
isPKChanging = ExpressionUtil.isPkPositionChanging(new
TableRef(projectedTable), selectExpressions);
@@ -313,11 +314,11 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
LOGGER.error("Closing table: " + targetHTable + " failed:
", e);
}
}
- if (targetHConn != null) {
+ if (targetPConn != null) {
try {
- targetHConn.close();
- } catch (IOException e) {
- LOGGER.error("Closing connection: " + targetHConn + "
failed: ", e);
+ targetPConn.close();
+ } catch (SQLException e) {
+ LOGGER.error("Closing connection: " + targetPConn + "
failed: ", e);
}
}
} finally {