This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new edd8861  PHOENIX-6694 Avoid unnecessary calls of fetching table meta 
data to region servers holding the system tables in batch oriented jobs in 
spark or hive otherwise those RS become hotspot
edd8861 is described below

commit edd8861a1f2ef6e1dd88c530bffe02986859745d
Author: Rajeshbabu Chintaguntla <[email protected]>
AuthorDate: Thu Jun 2 14:54:09 2022 +0530

    PHOENIX-6694 Avoid unnecessary calls of fetching table meta data to region 
servers holding the system tables in batch oriented jobs in spark or hive 
otherwise those RS become hotspot
    
    Co-authored-by: Rajeshbabu Chintaguntla <[email protected]>
---
 phoenix-spark-base/pom.xml                                 |  6 ++++++
 .../datasource/v2/reader/PhoenixDataSourceReadOptions.java |  8 +++++++-
 .../datasource/v2/reader/PhoenixDataSourceReader.java      | 11 ++++++++---
 .../datasource/v2/reader/PhoenixInputPartitionReader.java  | 14 ++++++++++++++
 phoenix5-spark3/pom.xml                                    |  9 ++++++++-
 .../sql/connector/reader/PhoenixDataSourceReadOptions.java |  9 ++++++++-
 .../spark/sql/connector/reader/PhoenixPartitionReader.java | 14 ++++++++++++++
 .../phoenix/spark/sql/connector/reader/PhoenixScan.java    |  6 ++++--
 8 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/phoenix-spark-base/pom.xml b/phoenix-spark-base/pom.xml
index f9d2761..353841c 100644
--- a/phoenix-spark-base/pom.xml
+++ b/phoenix-spark-base/pom.xml
@@ -439,6 +439,12 @@
       <artifactId>slf4j-log4j12</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
 
b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
index 5b3df46..003a34d 100644
--- 
a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
+++ 
b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReadOptions.java
@@ -27,9 +27,10 @@ class PhoenixDataSourceReadOptions implements Serializable {
     private final String scn;
     private final String selectStatement;
     private final Properties overriddenProps;
+    private final byte[] pTableCacheBytes;
 
     PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
-            String selectStatement, Properties overriddenProps) {
+            String selectStatement, Properties overriddenProps, byte[] 
pTableCacheBytes) {
         if(overriddenProps == null){
             throw new NullPointerException();
         }
@@ -38,6 +39,7 @@ class PhoenixDataSourceReadOptions implements Serializable {
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
         this.overriddenProps = overriddenProps;
+        this.pTableCacheBytes = pTableCacheBytes;
     }
 
     String getSelectStatement() {
@@ -59,4 +61,8 @@ class PhoenixDataSourceReadOptions implements Serializable {
     Properties getOverriddenProps() {
         return overriddenProps;
     }
+
+    byte[] getPTableCacheBytes() {
+        return pTableCacheBytes;
+    }
 }
diff --git 
a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
 
b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
index 79d8ba2..3ee821b 100644
--- 
a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
+++ 
b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixDataSourceReader.java
@@ -23,12 +23,15 @@ import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compat.CompatUtil;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.spark.FilterExpressionCompiler;
 import org.apache.phoenix.spark.SparkSchemaUtil;
 import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource;
@@ -181,10 +184,12 @@ public class PhoenixDataSourceReader implements 
DataSourceReader, SupportsPushDo
 
                 // Get the region size
                 long regionSize = CompatUtil.getSize(regionLocator, 
connection.getAdmin(), location);
-
+                byte[] pTableCacheBytes = 
PTableImpl.toProto(queryPlan.getTableRef().getTable()).
+                    toByteArray();
                 PhoenixDataSourceReadOptions phoenixDataSourceOptions =
-                        new PhoenixDataSourceReadOptions(zkUrl, 
currentScnValue.orElse(null),
-                                tenantId.orElse(null), selectStatement, 
overriddenProps);
+                     new PhoenixDataSourceReadOptions(zkUrl, 
currentScnValue.orElse(null),
+                            tenantId.orElse(null), selectStatement, 
overriddenProps,
+                         pTableCacheBytes);
                 if (splitByStats) {
                     for (Scan aScan : scans) {
                         
partitions.add(getInputPartition(phoenixDataSourceOptions,
diff --git 
a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
 
b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
index ec12718..841545c 100644
--- 
a/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
+++ 
b/phoenix-spark-base/src/main/java/org/apache/phoenix/spark/datasource/v2/reader/PhoenixInputPartitionReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.datasource.v2.reader;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -33,6 +34,8 @@ import org.apache.phoenix.compat.CompatUtil;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.generated.PTableProtos;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
@@ -41,12 +44,14 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.PhoenixInputSplit;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.spark.SerializableWritable;
 import org.apache.spark.executor.InputMetrics;
@@ -94,6 +99,15 @@ public class PhoenixInputPartitionReader implements 
InputPartitionReader<Interna
         }
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overridingProps)) {
+            PTable pTable = null;
+            try {
+                pTable = PTable.parseFrom(options.getPTableCacheBytes());
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException("Parsing the PTable Cache Bytes is 
failing ", e);
+            }
+            org.apache.phoenix.schema.PTable table = 
PTableImpl.createFromProto(pTable);
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            phoenixConnection.addTable(table, System.currentTimeMillis());
             final Statement statement = conn.createStatement();
             final String selectStatement = options.getSelectStatement();
             if (selectStatement == null){
diff --git a/phoenix5-spark3/pom.xml b/phoenix5-spark3/pom.xml
index c53c525..13ff51a 100644
--- a/phoenix5-spark3/pom.xml
+++ b/phoenix5-spark3/pom.xml
@@ -119,6 +119,13 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Misc dependencies -->
     <dependency>
       <groupId>joda-time</groupId>
@@ -178,4 +185,4 @@
         </plugin>
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
index 4f7408f..4d49150 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixDataSourceReadOptions.java
@@ -29,9 +29,11 @@ class PhoenixDataSourceReadOptions implements Serializable {
     private final String scn;
     private final String selectStatement;
     private final Properties overriddenProps;
+    private final byte[] pTableCacheBytes;
 
     PhoenixDataSourceReadOptions(String zkUrl, String scn, String tenantId,
-                                 String selectStatement, Properties 
overriddenProps) {
+                                 String selectStatement, Properties 
overriddenProps,
+        byte[] pTableCacheBytes) {
         if(overriddenProps == null){
             throw new NullPointerException();
         }
@@ -40,6 +42,7 @@ class PhoenixDataSourceReadOptions implements Serializable {
         this.tenantId = tenantId;
         this.selectStatement = selectStatement;
         this.overriddenProps = overriddenProps;
+        this.pTableCacheBytes = pTableCacheBytes;
     }
 
     String getSelectStatement() {
@@ -69,4 +72,8 @@ class PhoenixDataSourceReadOptions implements Serializable {
         }
         return overriddenProps;
     }
+
+    byte[] getPTableCacheBytes() {
+        return pTableCacheBytes;
+    }
 }
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
index 62c88b5..4cbc237 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixPartitionReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.sql.connector.reader;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -33,6 +34,7 @@ import org.apache.phoenix.compat.CompatUtil;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.generated.PTableProtos.PTable;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
@@ -41,11 +43,13 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.spark.executor.InputMetrics;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.read.PartitionReader;
@@ -79,8 +83,18 @@ public class PhoenixPartitionReader implements 
PartitionReader<InternalRow> {
     private QueryPlan getQueryPlan() throws SQLException {
         String zkUrl = options.getZkUrl();
         Properties overridingProps = getOverriddenPropsFromOptions();
+        overridingProps.put("phoenix.skip.system.tables.existence.check", 
Boolean.valueOf("true"));
         try (Connection conn = DriverManager.getConnection(
                 JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, 
overridingProps)) {
+            PTable pTable = null;
+            try {
+                pTable = PTable.parseFrom(options.getPTableCacheBytes());
+            } catch (InvalidProtocolBufferException e) {
+                throw new RuntimeException("Parsing the PTable Cache Bytes is 
failing ", e);
+            }
+            org.apache.phoenix.schema.PTable table = 
PTableImpl.createFromProto(pTable);
+            PhoenixConnection phoenixConnection = 
conn.unwrap(PhoenixConnection.class);
+            phoenixConnection.addTable(table, System.currentTimeMillis());
             final Statement statement = conn.createStatement();
             final String selectStatement = options.getSelectStatement();
             if (selectStatement == null){
diff --git 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
index ecdf7f6..451d7b1 100644
--- 
a/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
+++ 
b/phoenix5-spark3/src/main/java/org/apache/phoenix/spark/sql/connector/reader/PhoenixScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.spark.sql.connector.reader;
 
+import org.apache.phoenix.schema.PTableImpl;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -146,10 +147,11 @@ public class PhoenixScan implements Scan, Batch {
 
                 // Get the region size
                 long regionSize = CompatUtil.getSize(regionLocator, 
connection.getAdmin(), location);
-
+                byte[] pTableCacheBytes = 
PTableImpl.toProto(queryPlan.getTableRef().getTable()).
+                    toByteArray();
                 phoenixDataSourceOptions =
                         new PhoenixDataSourceReadOptions(zkUrl, 
currentScnValue,
-                                tenantId, selectStatement, overriddenProps);
+                                tenantId, selectStatement, overriddenProps, 
pTableCacheBytes);
 
                 if (splitByStats) {
                     for (org.apache.hadoop.hbase.client.Scan aScan : scans) {

Reply via email to