This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b04bb923 PHOENIX-7684 Introduce Segment Scan (#2255)
a0b04bb923 is described below

commit a0b04bb9233d3326abbcd150017eee9f4e4bbcc7
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed Aug 13 14:47:42 2025 -0700

    PHOENIX-7684 Introduce Segment Scan (#2255)
---
 .../org/apache/phoenix/compile/QueryCompiler.java  |   9 +
 .../apache/phoenix/compile/StatementContext.java   |  20 +
 .../org/apache/phoenix/compile/WhereCompiler.java  | 181 ++++-
 .../apache/phoenix/exception/SQLExceptionCode.java |   1 +
 .../apache/phoenix/execute/SegmentInfoPlan.java    |  93 +++
 .../apache/phoenix/expression/ExpressionType.java  |  11 +-
 .../expression/function/ScanEndKeyFunction.java    |  72 ++
 .../expression/function/ScanStartKeyFunction.java  |  73 ++
 .../expression/function/TotalSegmentsFunction.java |  68 ++
 .../phoenix/iterate/SegmentResultIterator.java     | 124 +++
 .../apache/phoenix/parse/ScanEndKeyParseNode.java  |  37 +
 .../phoenix/parse/ScanStartKeyParseNode.java       |  37 +
 .../phoenix/parse/TotalSegmentsParseNode.java      |  37 +
 .../end2end/BaseTotalSegmentsFunctionIT.java       | 872 +++++++++++++++++++++
 .../phoenix/end2end/ScanBoundaryFunction2IT.java   | 310 ++++++++
 .../phoenix/end2end/ScanBoundaryFunctionIT.java    | 620 +++++++++++++++
 .../phoenix/end2end/TotalSegmentsFunction2IT.java  |  45 ++
 .../phoenix/end2end/TotalSegmentsFunctionIT.java   |  44 ++
 .../phoenix/compile/ScanBoundaryFunctionTest.java  | 217 +++++
 .../function/BuiltinFunctionConstructorTest.java   |   6 +-
 20 files changed, 2873 insertions(+), 4 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index db20f6e24b..6a7d9ca9de 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
 import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
 import org.apache.phoenix.execute.LiteralResultIterationPlan;
 import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SegmentInfoPlan;
 import org.apache.phoenix.execute.SortMergeJoinPlan;
 import org.apache.phoenix.execute.TupleProjectionPlan;
 import org.apache.phoenix.execute.TupleProjector;
@@ -892,6 +893,14 @@ public class QueryCompiler {
     if (plan instanceof BaseQueryPlan) {
       ((BaseQueryPlan) plan).setApplicable(isApplicable);
     }
+
+    // Check if TOTAL_SEGMENTS function was used - if so, replace with a 
client-side plan that
+    // returns segment info
+    if (context.hasTotalSegmentsFunction()) {
+      plan = new SegmentInfoPlan(context, planSelect, tableRef, projector,
+        context.getTotalSegmentsValue());
+    }
+
     return plan;
   }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
index f07ea7ad60..0c09a4f941 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -87,6 +87,8 @@ public class StatementContext {
   private TableRef cdcDataTableRef;
   private AtomicBoolean hasFirstValidResult;
   private Set<StatementContext> subStatementContexts;
+  private boolean totalSegmentsFunction = false;
+  private Integer totalSegmentsValue;
 
   public StatementContext(PhoenixStatement statement) {
     this(statement, new Scan());
@@ -117,6 +119,8 @@ public class StatementContext {
     this.isUncoveredIndex = context.isUncoveredIndex;
     this.hasFirstValidResult = new 
AtomicBoolean(context.getHasFirstValidResult());
     this.subStatementContexts = Sets.newHashSet();
+    this.totalSegmentsFunction = context.totalSegmentsFunction;
+    this.totalSegmentsValue = context.totalSegmentsValue;
   }
 
   /**
@@ -435,4 +439,20 @@ public class StatementContext {
   public Set<StatementContext> getSubStatementContexts() {
     return subStatementContexts;
   }
+
+  public boolean hasTotalSegmentsFunction() {
+    return totalSegmentsFunction;
+  }
+
+  public void setTotalSegmentsFunction(boolean totalSegmentsFunction) {
+    this.totalSegmentsFunction = totalSegmentsFunction;
+  }
+
+  public Integer getTotalSegmentsValue() {
+    return totalSegmentsValue;
+  }
+
+  public void setTotalSegmentsValue(Integer totalSegmentsValue) {
+    this.totalSegmentsValue = totalSegmentsValue;
+  }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 1926b1b79b..9cb55e9aaf 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import org.apache.hadoop.hbase.CompareOperator;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
@@ -75,7 +76,11 @@ import org.apache.phoenix.expression.SubtractExpression;
 import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression;
 import org.apache.phoenix.expression.function.ArrayElemRefExpression;
 import org.apache.phoenix.expression.function.ScalarFunction;
+import org.apache.phoenix.expression.function.ScanEndKeyFunction;
+import org.apache.phoenix.expression.function.ScanStartKeyFunction;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.function.TotalSegmentsFunction;
+import org.apache.phoenix.expression.visitor.CloneExpressionVisitor;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
 import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor;
 import org.apache.phoenix.filter.MultiCFCQKeyValueComparisonFilter;
@@ -85,6 +90,7 @@ import org.apache.phoenix.filter.RowKeyComparisonFilter;
 import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
 import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.ComparisonParseNode;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.ParseNode;
@@ -106,6 +112,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.ExpressionUtil;
@@ -188,10 +195,25 @@ public class WhereCompiler {
     }
 
     Set<Expression> extractedNodes = Sets.<Expression> newHashSet();
-    WhereExpressionCompiler whereCompiler = new 
WhereExpressionCompiler(context);
+    ScanBoundaryExtractingCompiler whereCompiler = new 
ScanBoundaryExtractingCompiler(context);
     Expression expression = where == null
       ? LiteralExpression.newConstant(true, PBoolean.INSTANCE, 
Determinism.ALWAYS)
       : where.accept(whereCompiler);
+
+    if (whereCompiler.getScanStartKey() != null || 
whereCompiler.getScanEndKey() != null) {
+      // Remove scan boundary functions from the expression
+      ScanBoundaryRemovalVisitor removalVisitor = new 
ScanBoundaryRemovalVisitor();
+      expression = expression.accept(removalVisitor);
+    }
+
+    // Check for TOTAL_SEGMENTS function
+    if (whereCompiler.hasTotalSegments()) {
+      context.setTotalSegmentsFunction(true);
+      context.setTotalSegmentsValue(whereCompiler.getTotalSegmentsValue());
+      // Remove TOTAL_SEGMENTS function from the expression
+      ScanBoundaryRemovalVisitor removalVisitor = new 
ScanBoundaryRemovalVisitor();
+      expression = expression.accept(removalVisitor);
+    }
     if (whereCompiler.isAggregate()) {
       throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_IN_WHERE).build()
         .buildException();
@@ -222,13 +244,22 @@ public class WhereCompiler {
       expression = WhereOptimizer.pushKeyExpressionsToScan(context, hints, 
expression,
         extractedNodes, minOffset);
     }
+    if (whereCompiler.getScanStartKey() != null || 
whereCompiler.getScanEndKey() != null) {
+      Scan scan = context.getScan();
+      if (scan.getStartRow().length == 0 && whereCompiler.getScanStartKey() != 
null) {
+        scan.withStartRow(whereCompiler.getScanStartKey());
+      }
+      if (scan.getStopRow().length == 0 && whereCompiler.getScanEndKey() != 
null) {
+        scan.withStopRow(whereCompiler.getScanEndKey());
+      }
+    }
     setScanFilter(context, statement, expression, 
whereCompiler.disambiguateWithFamily);
 
     return expression;
   }
 
   public static class WhereExpressionCompiler extends ExpressionCompiler {
-    private boolean disambiguateWithFamily;
+    protected boolean disambiguateWithFamily;
 
     public WhereExpressionCompiler(StatementContext context) {
       super(context, true);
@@ -294,6 +325,152 @@ public class WhereCompiler {
     }
   }
 
+  private static final class ScanBoundaryExtractingCompiler extends 
WhereExpressionCompiler {
+
+    private byte[] scanStartKey;
+    private byte[] scanEndKey;
+    private boolean hasTotalSegments = false;
+    private Integer totalSegmentsValue;
+
+    private ScanBoundaryExtractingCompiler(StatementContext context) {
+      super(context);
+    }
+
+    @Override
+    public Expression visitLeave(ComparisonParseNode node, List<Expression> 
children)
+      throws SQLException {
+      boolean hasScanFunctionWithEquals = false;
+      if (node.getFilterOp() == CompareOperator.EQUAL && children.size() == 2) 
{
+        Expression lhs = children.get(0);
+        Expression rhs = children.get(1);
+
+        if (lhs instanceof ScanStartKeyFunction && rhs instanceof 
LiteralExpression) {
+          scanStartKey = extractBytes((LiteralExpression) rhs);
+          hasScanFunctionWithEquals = true;
+        } else if (rhs instanceof ScanStartKeyFunction && lhs instanceof 
LiteralExpression) {
+          scanStartKey = extractBytes((LiteralExpression) lhs);
+          hasScanFunctionWithEquals = true;
+        }
+
+        if (lhs instanceof ScanEndKeyFunction && rhs instanceof 
LiteralExpression) {
+          scanEndKey = extractBytes((LiteralExpression) rhs);
+          hasScanFunctionWithEquals = true;
+        } else if (rhs instanceof ScanEndKeyFunction && lhs instanceof 
LiteralExpression) {
+          scanEndKey = extractBytes((LiteralExpression) lhs);
+          hasScanFunctionWithEquals = true;
+        }
+
+        if (lhs instanceof TotalSegmentsFunction && rhs instanceof 
LiteralExpression) {
+          hasTotalSegments = true;
+          totalSegmentsValue = getTotalSegmentsVal((LiteralExpression) rhs);
+        } else if (rhs instanceof TotalSegmentsFunction && lhs instanceof 
LiteralExpression) {
+          hasTotalSegments = true;
+          totalSegmentsValue = getTotalSegmentsVal((LiteralExpression) lhs);
+        }
+      }
+      if (hasScanFunctionWithEquals) {
+        Expression expression = super.visitLeave(node, children);
+        if (
+          expression instanceof LiteralExpression && 
LiteralExpression.isBooleanNull(expression)
+        ) {
+          return LiteralExpression.newConstant(true, PBoolean.INSTANCE, 
Determinism.ALWAYS);
+        }
+        return expression;
+      }
+      return super.visitLeave(node, children);
+    }
+
+    private byte[] extractBytes(LiteralExpression literal) {
+      ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+      if (literal.evaluate(null, ptr)) {
+        return ptr.copyBytes();
+      }
+      return null;
+    }
+
+    private Integer getTotalSegmentsVal(LiteralExpression literal) throws 
SQLException {
+      ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+      if (literal.evaluate(null, ptr)) {
+        Integer value = (Integer) PInteger.INSTANCE.toObject(ptr);
+        if (value != null && value <= 0) {
+          throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_TOTAL_SEGMENTS_VALUE).build()
+            .buildException();
+        }
+        return value;
+      }
+      return null;
+    }
+
+    public byte[] getScanStartKey() {
+      return scanStartKey;
+    }
+
+    public byte[] getScanEndKey() {
+      return scanEndKey;
+    }
+
+    public boolean hasTotalSegments() {
+      return hasTotalSegments;
+    }
+
+    public Integer getTotalSegmentsValue() {
+      return totalSegmentsValue;
+    }
+  }
+
+  /**
+   * Visitor to remove scan boundary function comparisons from the expression 
tree since they've
+   * already been used to set the scan boundaries.
+   */
+  private static class ScanBoundaryRemovalVisitor extends 
CloneExpressionVisitor {
+    @Override
+    public Expression visitLeave(ComparisonExpression node, List<Expression> 
children) {
+
+      Expression lhs = children.get(0);
+      Expression rhs = children.get(1);
+
+      // Remove scan boundary function comparisons
+      if (node.getFilterOp() == CompareOperator.EQUAL) {
+        if (
+          lhs instanceof ScanStartKeyFunction || rhs instanceof 
ScanStartKeyFunction
+            || lhs instanceof ScanEndKeyFunction || rhs instanceof 
ScanEndKeyFunction
+            || lhs instanceof TotalSegmentsFunction || rhs instanceof 
TotalSegmentsFunction
+        ) {
+          try {
+            return LiteralExpression.newConstant(true, PBoolean.INSTANCE, 
Determinism.ALWAYS);
+          } catch (SQLException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+
+      return super.visitLeave(node, children);
+    }
+
+    @Override
+    public Expression visitLeave(AndExpression node, List<Expression> 
children) {
+      List<Expression> filteredChildren = 
Lists.newArrayListWithCapacity(children.size());
+      for (Expression child : children) {
+        if (!LiteralExpression.isTrue(child)) {
+          filteredChildren.add(child);
+        }
+      }
+      if (filteredChildren.isEmpty()) {
+        try {
+          return LiteralExpression.newConstant(true, PBoolean.INSTANCE, 
Determinism.ALWAYS);
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        try {
+          return AndExpression.create(filteredChildren);
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
   private static final class Counter {
     public enum Count {
       NONE,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 99a57f3a22..7ecd02528d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -103,6 +103,7 @@ public enum SQLExceptionCode {
   SUBQUERY_SELECT_LIST_COLUMN_MUST_HAS_ALIAS(219, "23019",
     "Every column in subquery select lists must has alias when used for 
join."),
   ROW_KEY_OUT_OF_SCAN_RANGE(220, "23020", "Row key is out of scan start/stop 
key boundaries"),
+  INVALID_TOTAL_SEGMENTS_VALUE(221, "23021", "TOTAL_SEGMENTS() value must be 
greater than 0"),
 
   CONCURRENT_TABLE_MUTATION(301, "23000", "Concurrent modification to table.", 
new Factory() {
     @Override
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/SegmentInfoPlan.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/SegmentInfoPlan.java
new file mode 100644
index 0000000000..79134600a2
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/SegmentInfoPlan.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ExplainPlan;
+import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SegmentResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
+
+/**
+ * Client-side plan that returns region information for segment instead of 
executing a server scan.
+ */
+public class SegmentInfoPlan extends ClientProcessingPlan {
+
+  private final Integer totalSegmentsValue;
+  private final Set<TableRef> sourceTables;
+
+  public SegmentInfoPlan(StatementContext context, FilterableStatement 
statement, TableRef tableRef,
+    RowProjector projector, Integer totalSegmentsValue) {
+    super(context, statement, tableRef, projector, null, null, null, 
OrderBy.EMPTY_ORDER_BY, null);
+    this.totalSegmentsValue = totalSegmentsValue;
+    this.sourceTables = ImmutableSet.of(tableRef);
+  }
+
+  @Override
+  public Set<TableRef> getSourceRefs() {
+    return sourceTables;
+  }
+
+  @Override
+  public boolean isApplicable() {
+    return true;
+  }
+
+  @Override
+  public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
+    ConnectionQueryServices services = 
context.getConnection().getQueryServices();
+    byte[] tableName = table.getTable().getPhysicalName().getBytes();
+
+    int queryTimeout = 
context.getConnection().getQueryServices().getProps().getInt(
+      QueryServices.THREAD_TIMEOUT_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
+    int totalSegments = totalSegmentsValue;
+
+    List<HRegionLocation> regions = services.getAllTableRegions(tableName, 
queryTimeout);
+
+    return new SegmentResultIterator(regions, totalSegments);
+  }
+
+  @Override
+  public <T> T accept(QueryPlanVisitor<T> visitor) {
+    return visitor.defaultReturn(this);
+  }
+
+  @Override
+  public ExplainPlan getExplainPlan() throws SQLException {
+    List<String> planSteps = Collections.singletonList("CLIENT REGION SCAN");
+    ExplainPlanAttributesBuilder builder = new ExplainPlanAttributesBuilder();
+    return new ExplainPlan(planSteps, builder.build());
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index f3d8993f85..50112cc939 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -18,8 +18,10 @@
 package org.apache.phoenix.expression;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.phoenix.expression.function.*;
 
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 /**
@@ -201,12 +203,19 @@ public enum ExpressionType {
   DecodeBinaryFunction(DecodeBinaryFunction.class),
   EncodeBinaryFunction(EncodeBinaryFunction.class),
   DecodeViewIdFunction(DecodeViewIndexIdFunction.class),
-  SubBinaryFunction(SubBinaryFunction.class);
+  SubBinaryFunction(SubBinaryFunction.class),
+  ScanStartKeyFunction(ScanStartKeyFunction.class),
+  ScanEndKeyFunction(ScanEndKeyFunction.class),
+  TotalSegmentsFunction(TotalSegmentsFunction.class);
 
   ExpressionType(Class<? extends Expression> clazz) {
     this.clazz = clazz;
   }
 
+  public static final Set<Class<? extends Expression>> 
EXPRESSION_TYPES_NOT_SUPPORTED_AT_SERVER =
+    ImmutableSet.of(ScanStartKeyFunction.getExpressionClass(),
+      ScanEndKeyFunction.getExpressionClass(), 
TotalSegmentsFunction.getExpressionClass());
+
   public Class<? extends Expression> getExpressionClass() {
     return clazz;
   }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/ScanEndKeyFunction.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/ScanEndKeyFunction.java
new file mode 100644
index 0000000000..2d2bb93f2a
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/ScanEndKeyFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.ScanEndKeyParseNode;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
+
+@BuiltInFunction(name = ScanEndKeyFunction.NAME, args = {}, nodeClass = 
ScanEndKeyParseNode.class)
+public class ScanEndKeyFunction extends ScalarFunction {
+
+  public static final String NAME = "SCAN_END_KEY";
+
+  public ScanEndKeyFunction(List<Expression> children) {
+    super(children);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public PDataType getDataType() {
+    return PVarbinary.INSTANCE;
+  }
+
+  @Override
+  public boolean isStateless() {
+    return false;
+  }
+
+  @Override
+  public Determinism getDeterminism() {
+    return Determinism.PER_ROW;
+  }
+
+  @Override
+  public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+    if (tuple != null) {
+      Cell cell = tuple.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("1"));
+      ptr.set(CellUtil.cloneValue(cell));
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/ScanStartKeyFunction.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/ScanStartKeyFunction.java
new file mode 100644
index 0000000000..e6d077e3e2
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/ScanStartKeyFunction.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.ScanStartKeyParseNode;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
+
+@BuiltInFunction(name = ScanStartKeyFunction.NAME, args = {},
+    nodeClass = ScanStartKeyParseNode.class)
+public class ScanStartKeyFunction extends ScalarFunction {
+
+  public static final String NAME = "SCAN_START_KEY";
+
+  public ScanStartKeyFunction(List<Expression> children) {
+    super(children);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public PDataType getDataType() {
+    return PVarbinary.INSTANCE;
+  }
+
+  @Override
+  public boolean isStateless() {
+    return false;
+  }
+
+  @Override
+  public Determinism getDeterminism() {
+    return Determinism.PER_ROW;
+  }
+
+  @Override
+  public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+    if (tuple != null) {
+      Cell cell = tuple.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0"));
+      ptr.set(CellUtil.cloneValue(cell));
+      return true;
+    }
+    return false;
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/TotalSegmentsFunction.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/TotalSegmentsFunction.java
new file mode 100644
index 0000000000..8c64493283
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/expression/function/TotalSegmentsFunction.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.TotalSegmentsParseNode;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+
+/**
+ * Function that returns segment scan boundaries.
+ */
+@BuiltInFunction(name = TotalSegmentsFunction.NAME, args = {},
+    nodeClass = TotalSegmentsParseNode.class)
+public class TotalSegmentsFunction extends ScalarFunction {
+
+  public static final String NAME = "TOTAL_SEGMENTS";
+
+  public TotalSegmentsFunction(List<Expression> children) throws SQLException {
+    super(children);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public PDataType getDataType() {
+    return PInteger.INSTANCE;
+  }
+
+  @Override
+  public boolean isStateless() {
+    return false;
+  }
+
+  @Override
+  public Determinism getDeterminism() {
+    return Determinism.PER_ROW;
+  }
+
+  @Override
+  public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+    throw new IllegalStateException("TOTAL_SEGMENTS() should not be 
evaluated");
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SegmentResultIterator.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SegmentResultIterator.java
new file mode 100644
index 0000000000..1bed34afbd
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SegmentResultIterator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * ResultIterator that returns segment scan start and end keys for a table. 
Each row contains the
+ * start key and end key of a segment.
+ */
+public class SegmentResultIterator extends BaseResultIterator {
+
+  private final Iterator<Segment> segmentIterator;
+  private int key;
+
+  public SegmentResultIterator(List<HRegionLocation> regions, int 
totalSegments) {
+    this.segmentIterator = getSegments(regions, totalSegments).iterator();
+    this.key = 0;
+  }
+
+  @Override
+  public Tuple next() throws SQLException {
+    if (!segmentIterator.hasNext()) {
+      return null;
+    }
+
+    Segment segment = segmentIterator.next();
+
+    byte[] startKey = segment.getStartKey();
+    byte[] endKey = segment.getEndKey();
+
+    byte[] rowKey = Bytes.toBytes(key);
+    key++;
+
+    List<Cell> cells = new ArrayList<>();
+    cells.add(new KeyValue(rowKey, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("0"),
+      startKey));
+    cells.add(
+      new KeyValue(rowKey, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes("1"), endKey));
+
+    Result result = Result.create(cells);
+    return new ResultTuple(result);
+  }
+
+  public static class Segment {
+    byte[] startKey;
+    byte[] endKey;
+
+    public Segment(byte[] startKey, byte[] endKey) {
+      this.startKey = startKey;
+      this.endKey = endKey;
+    }
+
+    public byte[] getStartKey() {
+      return startKey;
+    }
+
+    public byte[] getEndKey() {
+      return endKey;
+    }
+  }
+
+  public static List<Segment> getSegments(List<HRegionLocation> regions, int 
numBuckets) {
+    int size = regions.size();
+    List<Segment> segments = new ArrayList<>();
+    if (size <= numBuckets) {
+      for (HRegionLocation region : regions) {
+        segments.add(new Segment(region.getRegion().getStartKey(), 
region.getRegion().getEndKey()));
+      }
+    } else {
+      int q = size / numBuckets;
+      int r = size % numBuckets;
+
+      int currentIndex = 0;
+      for (int i = 0; i < numBuckets; i++) {
+        int bucketSize = q + (i < r ? 1 : 0);
+        HRegionLocation first = regions.get(currentIndex);
+        HRegionLocation last = regions.get(currentIndex + bucketSize - 1);
+
+        segments.add(new Segment(first.getRegion().getStartKey(), 
last.getRegion().getEndKey()));
+        currentIndex += bucketSize;
+      }
+    }
+    return segments;
+  }
+
+  @Override
+  public void explain(List<String> planSteps) {
+    planSteps.add("CLIENT SEGMENT SCAN");
+  }
+
+  @Override
+  public void explain(List<String> planSteps,
+    ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
+    planSteps.add("CLIENT SEGMENT SCAN");
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ScanEndKeyParseNode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ScanEndKeyParseNode.java
new file mode 100644
index 0000000000..a9f4072f4d
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ScanEndKeyParseNode.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.ScanEndKeyFunction;
+
+public class ScanEndKeyParseNode extends FunctionParseNode {
+
+  public ScanEndKeyParseNode(String name, List<ParseNode> children, 
BuiltInFunctionInfo info) {
+    super(name, children, info);
+  }
+
+  @Override
+  public Expression create(List<Expression> children, StatementContext context)
+    throws SQLException {
+    return new ScanEndKeyFunction(children);
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ScanStartKeyParseNode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ScanStartKeyParseNode.java
new file mode 100644
index 0000000000..b4a08ffd52
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ScanStartKeyParseNode.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.ScanStartKeyFunction;
+
+public class ScanStartKeyParseNode extends FunctionParseNode {
+
+  public ScanStartKeyParseNode(String name, List<ParseNode> children, 
BuiltInFunctionInfo info) {
+    super(name, children, info);
+  }
+
+  @Override
+  public Expression create(List<Expression> children, StatementContext context)
+    throws SQLException {
+    return new ScanStartKeyFunction(children);
+  }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/TotalSegmentsParseNode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/TotalSegmentsParseNode.java
new file mode 100644
index 0000000000..b331bc314b
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/TotalSegmentsParseNode.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.TotalSegmentsFunction;
+
+public class TotalSegmentsParseNode extends FunctionParseNode {
+
+  public TotalSegmentsParseNode(String name, List<ParseNode> children, 
BuiltInFunctionInfo info) {
+    super(name, children, info);
+  }
+
+  @Override
+  public Expression create(List<Expression> children, StatementContext context)
+    throws SQLException {
+    return new TotalSegmentsFunction(children);
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTotalSegmentsFunctionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTotalSegmentsFunctionIT.java
new file mode 100644
index 0000000000..2043b57783
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTotalSegmentsFunctionIT.java
@@ -0,0 +1,872 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Base class for TOTAL_SEGMENTS() tests with common test logic
+ */
+public abstract class BaseTotalSegmentsFunctionIT extends 
ParallelStatsDisabledIT {
+
+  protected String fullTableName;
+
+  @Before
+  public void setUp() throws Exception {
+    String schemaName = generateUniqueName();
+    String tableName = generateUniqueName();
+    fullTableName = schemaName + "." + tableName;
+  }
+
+  /**
+   * Returns the primary key column definition for the specific implementation
+   */
+  protected abstract String getPrimaryKeyColumnType();
+
+  /**
+   * Returns the composite key column definition for the specific 
implementation
+   */
+  protected abstract String getCompositeKeyColumnType();
+
+  /**
+   * Extracts the primary key value from a ResultSet for display purposes
+   */
+  protected abstract String extractPrimaryKeyValue(ResultSet rs) throws 
SQLException;
+
+  @Test
+  public void testTotalSegmentsWithSimpleTable() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with multiple splits
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON ('B', 'D', 'F')";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions from ConnectionQueryServices
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      // Execute TOTAL_SEGMENTS query
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 95";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> queryRegions = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          queryRegions.add(new RegionInfo(startKey, endKey));
+        }
+
+        // Verify we get the same number of regions
+        assertEquals("Number of regions should match", actualRegions.size(), 
queryRegions.size());
+
+        // Verify each region's start and end keys match
+        for (int i = 0; i < actualRegions.size(); i++) {
+          HRegionLocation actualRegion = actualRegions.get(i);
+          RegionInfo queryRegion = queryRegions.get(i);
+
+          byte[] expectedStart = actualRegion.getRegion().getStartKey();
+          byte[] expectedEnd = actualRegion.getRegion().getEndKey();
+
+          assertArrayEquals("Start key should match for region " + i, 
expectedStart,
+            queryRegion.startKey);
+          assertArrayEquals("End key should match for region " + i, 
expectedEnd,
+            queryRegion.endKey);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithCompositeKey() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with composite primary key and splits
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK1 INTEGER 
NOT NULL, " + "PK2 "
+        + getCompositeKeyColumnType() + " NOT NULL, " + "V1 VARCHAR, "
+        + "CONSTRAINT PK PRIMARY KEY (PK1, PK2)" + ") SPLIT ON ((1,'B'), 
(2,'A'), (3,'C'))";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      // Execute TOTAL_SEGMENTS query
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 50";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> queryRegions = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          queryRegions.add(new RegionInfo(startKey, endKey));
+        }
+
+        // Verify region count and boundaries
+        assertEquals("Number of regions should match", actualRegions.size(), 
queryRegions.size());
+
+        for (int i = 0; i < actualRegions.size(); i++) {
+          HRegionLocation actualRegion = actualRegions.get(i);
+          RegionInfo queryRegion = queryRegions.get(i);
+
+          assertArrayEquals("Start key should match for composite key region " 
+ i,
+            actualRegion.getRegion().getStartKey(), queryRegion.startKey);
+          assertArrayEquals("End key should match for composite key region " + 
i,
+            actualRegion.getRegion().getEndKey(), queryRegion.endKey);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithSingleRegion() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table without splits (single region)
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      // Should have exactly one region
+      assertEquals("Single region table should have one region", 1, 
actualRegions.size());
+
+      // Execute TOTAL_SEGMENTS query
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 1";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        assertTrue("Should have exactly one result row", rs.next());
+
+        byte[] queryStartKey = rs.getBytes(1);
+        byte[] queryEndKey = rs.getBytes(2);
+        queryStartKey = queryStartKey == null ? new byte[0] : queryStartKey;
+        queryEndKey = queryEndKey == null ? new byte[0] : queryEndKey;
+
+        HRegionLocation singleRegion = actualRegions.get(0);
+        byte[] expectedStartKey = singleRegion.getRegion().getStartKey();
+        byte[] expectedEndKey = singleRegion.getRegion().getEndKey();
+
+        assertArrayEquals("Start key should match for single region", 
expectedStartKey,
+          queryStartKey);
+        assertArrayEquals("End key should match for single region", 
expectedEndKey, queryEndKey);
+
+        // Verify no more rows
+        assertTrue("Should have only one result row", !rs.next());
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithManyRegions() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with many splits
+      String[] splits = { "'10'", "'20'", "'30'", "'40'", "'50'", "'60'", 
"'70'", "'80'", "'90'" };
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON (" + String.join(", ", 
splits) + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      // Should have 10 regions (9 splits + 1)
+      assertEquals("Should have 10 regions with 9 splits", 10, 
actualRegions.size());
+
+      // Execute TOTAL_SEGMENTS query
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 100";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> queryRegions = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          queryRegions.add(new RegionInfo(startKey, endKey));
+        }
+
+        assertEquals("Should return 10 regions", 10, queryRegions.size());
+
+        // Verify all regions match
+        for (int i = 0; i < actualRegions.size(); i++) {
+          HRegionLocation actualRegion = actualRegions.get(i);
+          RegionInfo queryRegion = queryRegions.get(i);
+
+          assertArrayEquals("Start key should match for region " + i + " in 
many-region table",
+            actualRegion.getRegion().getStartKey(), queryRegion.startKey);
+          assertArrayEquals("End key should match for region " + i + " in 
many-region table",
+            actualRegion.getRegion().getEndKey(), queryRegion.endKey);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsDoesNotGoToServer() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON ('B', 'D')";
+
+      conn.createStatement().execute(createSql);
+
+      // Insert some data so we can verify the query doesn't scan it
+      try (PreparedStatement insert =
+        conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES (?, 
?)")) {
+        insert.setString(1, "A");
+        insert.setString(2, "ValueA");
+        insert.executeUpdate();
+        insert.setString(1, "C");
+        insert.setString(2, "ValueC");
+        insert.executeUpdate();
+        insert.setString(1, "E");
+        insert.setString(2, "ValueE");
+        insert.executeUpdate();
+        conn.commit();
+      }
+
+      // Execute TOTAL_SEGMENTS query - this should NOT scan the inserted data
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 42";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        int regionCount = 0;
+        while (rs.next()) {
+          regionCount++;
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+
+          // Verify that we get region boundary data, not row data
+          assertNotNull("Start key should not be null", startKey);
+          assertNotNull("End key should not be null", endKey);
+        }
+
+        // Should get 3 regions (2 splits + 1)
+        assertEquals("Should return 3 regions from client-side region scan", 
3, regionCount);
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithRegionBucketing() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with 12 regions (11 splits)
+      String[] splits =
+        { "'10'", "'20'", "'30'", "'40'", "'50'", "'60'", "'70'", "'80'", 
"'90'", "'A0'", "'B0'" };
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON (" + String.join(", ", 
splits) + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      // Should have 12 regions (11 splits + 1)
+      assertEquals("Should have 12 regions with 11 splits", 12, 
actualRegions.size());
+
+      // Test with TOTAL_SEGMENTS() = 4 (less than actual regions)
+      // This should bucket 12 regions into 4 segments
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 4";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> segments = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          segments.add(new RegionInfo(startKey, endKey));
+        }
+
+        // Should get exactly 4 segments
+        assertEquals("Should return 4 segments when bucketing 12 regions", 4, 
segments.size());
+
+        // Verify bucketing logic: 12 regions / 4 buckets = 3 regions per 
bucket
+        // Bucket 0: regions 0,1,2 -> start=region[0].start, end=region[2].end
+        assertArrayEquals("First segment should start with first region's 
start key",
+          actualRegions.get(0).getRegion().getStartKey(), 
segments.get(0).startKey);
+        assertArrayEquals("First segment should end with third region's end 
key",
+          actualRegions.get(2).getRegion().getEndKey(), 
segments.get(0).endKey);
+
+        // Bucket 1: regions 3,4,5 -> start=region[3].start, end=region[5].end
+        assertArrayEquals("Second segment should start with fourth region's 
start key",
+          actualRegions.get(3).getRegion().getStartKey(), 
segments.get(1).startKey);
+        assertArrayEquals("Second segment should end with sixth region's end 
key",
+          actualRegions.get(5).getRegion().getEndKey(), 
segments.get(1).endKey);
+
+        // Bucket 2: regions 6,7,8 -> start=region[6].start, end=region[8].end
+        assertArrayEquals("Third segment should start with seventh region's 
start key",
+          actualRegions.get(6).getRegion().getStartKey(), 
segments.get(2).startKey);
+        assertArrayEquals("Third segment should end with ninth region's end 
key",
+          actualRegions.get(8).getRegion().getEndKey(), 
segments.get(2).endKey);
+
+        // Bucket 3: regions 9,10,11 -> start=region[9].start, 
end=region[11].end
+        assertArrayEquals("Fourth segment should start with tenth region's 
start key",
+          actualRegions.get(9).getRegion().getStartKey(), 
segments.get(3).startKey);
+        assertArrayEquals("Fourth segment should end with twelfth region's end 
key",
+          actualRegions.get(11).getRegion().getEndKey(), 
segments.get(3).endKey);
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithUnevenBucketing() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with 10 regions (9 splits)
+      String[] splits = { "'10'", "'20'", "'30'", "'40'", "'50'", "'60'", 
"'70'", "'80'", "'90'" };
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON (" + String.join(", ", 
splits) + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      assertEquals("Should have 10 regions", 10, actualRegions.size());
+
+      // Test with TOTAL_SEGMENTS() = 3 (uneven division: 10/3 = 3 remainder 1)
+      // This should create: bucket1=4 regions, bucket2=3 regions, bucket3=3 
regions
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 3";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> segments = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          segments.add(new RegionInfo(startKey, endKey));
+        }
+
+        assertEquals("Should return 3 segments", 3, segments.size());
+
+        // Verify uneven bucketing: q=3, r=1
+        // Bucket 0 (size=4): regions 0,1,2,3
+        assertArrayEquals("First segment should start with first region's 
start key",
+          actualRegions.get(0).getRegion().getStartKey(), 
segments.get(0).startKey);
+        assertArrayEquals("First segment should end with fourth region's end 
key",
+          actualRegions.get(3).getRegion().getEndKey(), 
segments.get(0).endKey);
+
+        // Bucket 1 (size=3): regions 4,5,6
+        assertArrayEquals("Second segment should start with fifth region's 
start key",
+          actualRegions.get(4).getRegion().getStartKey(), 
segments.get(1).startKey);
+        assertArrayEquals("Second segment should end with seventh region's end 
key",
+          actualRegions.get(6).getRegion().getEndKey(), 
segments.get(1).endKey);
+
+        // Bucket 2 (size=3): regions 7,8,9
+        assertArrayEquals("Third segment should start with eighth region's 
start key",
+          actualRegions.get(7).getRegion().getStartKey(), 
segments.get(2).startKey);
+        assertArrayEquals("Third segment should end with tenth region's end 
key",
+          actualRegions.get(9).getRegion().getEndKey(), 
segments.get(2).endKey);
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithLargeTableSmallSegments() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with 20 regions (19 splits)
+      List<String> splitList = new ArrayList<>();
+      for (int i = 1; i < 20; i++) {
+        splitList.add(String.format("'%02d'", i * 5));
+      }
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON (" + String.join(", ", 
splitList) + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      assertEquals("Should have 20 regions", 20, actualRegions.size());
+
+      // Test with TOTAL_SEGMENTS() = 6 (20 regions -> 6 segments)
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 6";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> segments = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          segments.add(new RegionInfo(startKey, endKey));
+        }
+
+        assertEquals("Should return 6 segments when bucketing 20 regions", 6, 
segments.size());
+
+        // Verify detailed bucketing logic: 20 regions / 6 segments
+        // q = 20/6 = 3, r = 20%6 = 2
+        // First 2 segments get 4 regions each, remaining 4 segments get 3 
regions each
+
+        // Segment 0: regions 0,1,2,3 (4 regions)
+        assertArrayEquals("Segment 0 should start with region 0's start key",
+          actualRegions.get(0).getRegion().getStartKey(), 
segments.get(0).startKey);
+        assertArrayEquals("Segment 0 should end with region 3's end key",
+          actualRegions.get(3).getRegion().getEndKey(), 
segments.get(0).endKey);
+
+        // Segment 1: regions 4,5,6,7 (4 regions)
+        assertArrayEquals("Segment 1 should start with region 4's start key",
+          actualRegions.get(4).getRegion().getStartKey(), 
segments.get(1).startKey);
+        assertArrayEquals("Segment 1 should end with region 7's end key",
+          actualRegions.get(7).getRegion().getEndKey(), 
segments.get(1).endKey);
+
+        // Segment 2: regions 8,9,10 (3 regions)
+        assertArrayEquals("Segment 2 should start with region 8's start key",
+          actualRegions.get(8).getRegion().getStartKey(), 
segments.get(2).startKey);
+        assertArrayEquals("Segment 2 should end with region 10's end key",
+          actualRegions.get(10).getRegion().getEndKey(), 
segments.get(2).endKey);
+
+        // Segment 3: regions 11,12,13 (3 regions)
+        assertArrayEquals("Segment 3 should start with region 11's start key",
+          actualRegions.get(11).getRegion().getStartKey(), 
segments.get(3).startKey);
+        assertArrayEquals("Segment 3 should end with region 13's end key",
+          actualRegions.get(13).getRegion().getEndKey(), 
segments.get(3).endKey);
+
+        // Segment 4: regions 14,15,16 (3 regions)
+        assertArrayEquals("Segment 4 should start with region 14's start key",
+          actualRegions.get(14).getRegion().getStartKey(), 
segments.get(4).startKey);
+        assertArrayEquals("Segment 4 should end with region 16's end key",
+          actualRegions.get(16).getRegion().getEndKey(), 
segments.get(4).endKey);
+
+        // Segment 5: regions 17,18,19 (3 regions)
+        assertArrayEquals("Segment 5 should start with region 17's start key",
+          actualRegions.get(17).getRegion().getStartKey(), 
segments.get(5).startKey);
+        assertArrayEquals("Segment 5 should end with region 19's end key",
+          actualRegions.get(19).getRegion().getEndKey(), 
segments.get(5).endKey);
+
+        // Verify that segments don't overlap and cover all regions
+        for (int i = 0; i < segments.size() - 1; i++) {
+          // End key of segment[i] should be >= start key of segment[i+1]
+          // (they might be equal for adjacent segments)
+          assertTrue("Segments should be ordered correctly",
+            Bytes.compareTo(segments.get(i).endKey, segments.get(i + 
1).startKey) <= 0);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsEdgeCase() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with 5 regions
+      String[] splits = { "'20'", "'40'", "'60'", "'80'" };
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ") SPLIT ON (" + String.join(", ", 
splits) + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Test edge case: TOTAL_SEGMENTS() = 1 (all regions combined into one 
segment)
+      String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 1";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql);
+        ResultSet rs = stmt.executeQuery()) {
+
+        assertTrue("Should have exactly one result", rs.next());
+
+        byte[] startKey = rs.getBytes(1);
+        byte[] endKey = rs.getBytes(2);
+        startKey = startKey == null ? new byte[0] : startKey;
+        endKey = endKey == null ? new byte[0] : endKey;
+
+        // Get actual regions for verification
+        PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+        ConnectionQueryServices services = phoenixConn.getQueryServices();
+        byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+        List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+        // Single segment should span from first region's start to last 
region's end
+        assertArrayEquals("Single segment should start with first region's 
start key",
+          actualRegions.get(0).getRegion().getStartKey(), startKey);
+        assertArrayEquals("Single segment should end with last region's end 
key",
+          actualRegions.get(actualRegions.size() - 1).getRegion().getEndKey(), 
endKey);
+
+        // Should have no more results
+        assertTrue("Should have only one result", !rs.next());
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithDataDistribution() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with 15 regions (14 splits) for good bucketing examples
+      String[] splits = { "'10'", "'20'", "'30'", "'40'", "'50'", "'60'", 
"'70'", "'80'", "'90'",
+        "'A0'", "'B0'", "'C0'", "'D0'", "'E0'" };
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR, V2 INTEGER" + ") SPLIT ON (" + 
String.join(", ", splits)
+        + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Insert data across different regions
+      try (PreparedStatement insert =
+        conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES (?, ?, 
?)")) {
+        String[] testKeys =
+          { "05", "15", "25", "35", "45", "55", "65", "75", "85", "95", "A5", 
"B5", "C5", "D5",
+            "E5", "F5", "2X2903hg", "5Ywoe", "EeEe45", "20", "50", "500", 
"90", "10" };
+        for (int i = 0; i < testKeys.length; i++) {
+          insert.setString(1, testKeys[i]);
+          insert.setString(2, "Value" + i);
+          insert.setInt(3, i * 10);
+          insert.executeUpdate();
+        }
+        conn.commit();
+      }
+
+      // Get actual regions
+      PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+      ConnectionQueryServices services = phoenixConn.getQueryServices();
+      byte[] physicalTableName = 
phoenixConn.getTable(fullTableName).getPhysicalName().getBytes();
+      List<HRegionLocation> actualRegions = 
services.getAllTableRegions(physicalTableName, 30000);
+
+      assertEquals("Should have 15 regions", 15, actualRegions.size());
+
+      // Test case 1: Bucket 15 regions into 5 segments (3 regions per segment)
+      String sql1 = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 5";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql1);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> segments = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          segments.add(new RegionInfo(startKey, endKey));
+        }
+
+        assertEquals("Should return 5 segments", 5, segments.size());
+
+        // Verify 15/5 = 3 regions per segment
+        // Segment 0: regions 0,1,2
+        assertArrayEquals("First segment should start with first region",
+          actualRegions.get(0).getRegion().getStartKey(), 
segments.get(0).startKey);
+        assertArrayEquals("First segment should end with third region",
+          actualRegions.get(2).getRegion().getEndKey(), 
segments.get(0).endKey);
+
+        // Segment 4: regions 12,13,14
+        assertArrayEquals("Last segment should start with thirteenth region",
+          actualRegions.get(12).getRegion().getStartKey(), 
segments.get(4).startKey);
+        assertArrayEquals("Last segment should end with fifteenth region",
+          actualRegions.get(14).getRegion().getEndKey(), 
segments.get(4).endKey);
+      }
+
+      // Test case 2: Bucket 15 regions into 7 segments (uneven distribution)
+      // 15/7 = 2 remainder 1, so first 1 bucket gets 3 regions, rest get 2
+      String sql2 = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 7";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql2);
+        ResultSet rs = stmt.executeQuery()) {
+
+        List<RegionInfo> segments = new ArrayList<>();
+        while (rs.next()) {
+          byte[] startKey = rs.getBytes(1);
+          byte[] endKey = rs.getBytes(2);
+          startKey = startKey == null ? new byte[0] : startKey;
+          endKey = endKey == null ? new byte[0] : endKey;
+          segments.add(new RegionInfo(startKey, endKey));
+        }
+
+        assertEquals("Should return 7 segments for uneven bucketing", 7, 
segments.size());
+
+        assertArrayEquals("First segment should span 3 regions",
+          actualRegions.get(2).getRegion().getEndKey(), 
segments.get(0).endKey);
+
+        assertArrayEquals("Second segment should start with fourth region",
+          actualRegions.get(3).getRegion().getStartKey(), 
segments.get(1).startKey);
+        assertArrayEquals("Second segment should span 2 regions",
+          actualRegions.get(4).getRegion().getEndKey(), 
segments.get(1).endKey);
+      }
+
+      // Full table scan
+      String dataQuery = "SELECT PK, V1, V2 FROM " + fullTableName;
+      int totalRowsFound = 0;
+      List<String> totalData = new ArrayList<>();
+      try (PreparedStatement dataStmt = conn.prepareStatement(dataQuery)) {
+        try (ResultSet dataRs = dataStmt.executeQuery()) {
+          while (dataRs.next()) {
+            String pk = extractPrimaryKeyValue(dataRs);
+            String v1 = dataRs.getString(2);
+            int v2 = dataRs.getInt(3);
+            totalRowsFound++;
+            totalData.add(String.format("PK=%s, V1=%s, V2=%d", pk, v1, v2));
+          }
+        }
+      }
+      assertEquals("Total 24 rows", 24, totalRowsFound);
+
+      for (int segment = 1; segment < 20; segment++) {
+        String sql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+          + " WHERE TOTAL_SEGMENTS() = " + segment;
+        List<String> segmentData = new ArrayList<>();
+        assertEquals("Total 24 rows", 24, getTotalRowsFound(conn, sql, 
segmentData));
+        assertEquals("All rows should be matching with full table scan", 
totalData, segmentData);
+      }
+
+      // Test count(*) queries for each segment and verify total count
+      for (int segment = 1; segment <= 20; segment++) {
+        String segmentSql = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+          + " WHERE TOTAL_SEGMENTS() = " + segment;
+
+        int totalCountFromSegments = getTotalRowCountFromSegments(conn, 
segmentSql);
+
+        assertEquals(
+          "Total count from segments should match expected count for " + 
segment + " segments", 24,
+          totalCountFromSegments);
+      }
+    }
+  }
+
+  @Test
+  public void testTotalSegmentsWithInvalidValues() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create simple table
+      String createSql = "CREATE TABLE " + fullTableName + " (" + "PK " + 
getPrimaryKeyColumnType()
+        + " PRIMARY KEY, " + "V1 VARCHAR" + ")";
+
+      conn.createStatement().execute(createSql);
+
+      // Test with TOTAL_SEGMENTS() = 0 (should throw exception)
+      String sql0 = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 0";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql0)) {
+        stmt.executeQuery();
+        fail("Expected SQLException for TOTAL_SEGMENTS() = 0");
+      } catch (SQLException e) {
+        assertEquals("Expected error code for invalid TOTAL_SEGMENTS value", 
221, e.getErrorCode());
+        assertTrue("Expected error message about TOTAL_SEGMENTS value",
+          e.getMessage().contains("TOTAL_SEGMENTS() value must be greater than 
0"));
+      }
+
+      // Test with TOTAL_SEGMENTS() = -1 (should throw exception)
+      String sqlNegative = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = -1";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sqlNegative)) {
+        stmt.executeQuery();
+        fail("Expected SQLException for TOTAL_SEGMENTS() = -1");
+      } catch (SQLException e) {
+        assertEquals("Expected error code for invalid TOTAL_SEGMENTS value", 
221, e.getErrorCode());
+        assertTrue("Expected error message about TOTAL_SEGMENTS value",
+          e.getMessage().contains("TOTAL_SEGMENTS() value must be greater than 
0"));
+      }
+
+      // Test with TOTAL_SEGMENTS() = -100 (should throw exception)
+      String sqlNegativeLarge = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM 
" + fullTableName
+        + " WHERE TOTAL_SEGMENTS() = -100";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sqlNegativeLarge)) {
+        stmt.executeQuery();
+        fail("Expected SQLException for TOTAL_SEGMENTS() = -100");
+      } catch (SQLException e) {
+        assertEquals("Expected error code for invalid TOTAL_SEGMENTS value", 
221, e.getErrorCode());
+        assertTrue("Expected error message about TOTAL_SEGMENTS value",
+          e.getMessage().contains("TOTAL_SEGMENTS() value must be greater than 
0"));
+      }
+
+      // Test with TOTAL_SEGMENTS() = 1 (should work fine)
+      String sql1 = "SELECT SCAN_START_KEY(), SCAN_END_KEY() FROM " + 
fullTableName
+        + " WHERE TOTAL_SEGMENTS() = 1";
+
+      try (PreparedStatement stmt = conn.prepareStatement(sql1);
+        ResultSet rs = stmt.executeQuery()) {
+        assertTrue("Should have exactly one result for TOTAL_SEGMENTS() = 1", 
rs.next());
+        assertFalse("Should have only one result", rs.next());
+      }
+    }
+  }
+
+  protected int getTotalRowsFound(Connection conn, String sql, List<String> 
segmentData)
+    throws SQLException {
+    int totalRowsFound = 0;
+    try (PreparedStatement stmt = conn.prepareStatement(sql); ResultSet rs = 
stmt.executeQuery()) {
+
+      while (rs.next()) {
+        byte[] segmentStart = rs.getBytes(1);
+        byte[] segmentEnd = rs.getBytes(2);
+
+        // Use the segment boundaries to query data with scan boundary 
functions
+        String dataQuery = "SELECT PK, V1, V2 FROM " + fullTableName
+          + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ?";
+
+        try (PreparedStatement dataStmt = conn.prepareStatement(dataQuery)) {
+          dataStmt.setBytes(1, segmentStart);
+          dataStmt.setBytes(2, segmentEnd);
+          try (ResultSet dataRs = dataStmt.executeQuery()) {
+            while (dataRs.next()) {
+              String pk = extractPrimaryKeyValue(dataRs);
+              String v1 = dataRs.getString(2);
+              int v2 = dataRs.getInt(3);
+              totalRowsFound++;
+              segmentData.add(String.format("PK=%s, V1=%s, V2=%d", pk, v1, 
v2));
+            }
+          }
+        }
+      }
+    }
+    return totalRowsFound;
+  }
+
+  protected int getTotalRowCountFromSegments(Connection conn, String 
segmentSql)
+    throws SQLException {
+    int totalCountFromSegments = 0;
+    try (PreparedStatement segmentStmt = conn.prepareStatement(segmentSql);
+      ResultSet segmentRs = segmentStmt.executeQuery()) {
+
+      while (segmentRs.next()) {
+        byte[] segmentStart = segmentRs.getBytes(1);
+        byte[] segmentEnd = segmentRs.getBytes(2);
+
+        // Count rows in this segment
+        String countQuery = "SELECT COUNT(*) FROM " + fullTableName
+          + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ?";
+
+        try (PreparedStatement countStmt = conn.prepareStatement(countQuery)) {
+          countStmt.setBytes(1, segmentStart);
+          countStmt.setBytes(2, segmentEnd);
+          try (ResultSet countRs = countStmt.executeQuery()) {
+            if (countRs.next()) {
+              totalCountFromSegments += countRs.getInt(1);
+            }
+          }
+        }
+      }
+    }
+    return totalCountFromSegments;
+  }
+
+  /**
+   * Helper class to store region information for comparison
+   */
+  protected static class RegionInfo {
+    final byte[] startKey;
+    final byte[] endKey;
+
+    RegionInfo(byte[] startKey, byte[] endKey) {
+      this.startKey = startKey;
+      this.endKey = endKey;
+    }
+
+    @Override
+    public String toString() {
+      return "RegionInfo{startKey=" + Arrays.toString(startKey) + ", endKey="
+        + Arrays.toString(endKey) + "}";
+    }
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanBoundaryFunction2IT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanBoundaryFunction2IT.java
new file mode 100644
index 0000000000..2a22194746
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanBoundaryFunction2IT.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelStatsDisabledTest.class)
+public class ScanBoundaryFunction2IT extends ParallelStatsDisabledIT {
+
+  private String tableName;
+  private String fullTableName;
+
+  @Before
+  public void setUp() throws Exception {
+    tableName = generateUniqueName();
+    fullTableName = "\"" + tableName + "\"";
+
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with composite primary key: PK1 (INTEGER) and PK2 
(VARCHAR)
+      String createTableSql =
+        "CREATE TABLE " + fullTableName + " (" + "PK1 INTEGER NOT NULL, " + 
"PK2 VARCHAR NOT NULL, "
+          + "COL1 VARCHAR, " + "COL2 INTEGER, " + "CONSTRAINT PK PRIMARY KEY 
(PK1, PK2)" + ")";
+      conn.createStatement().execute(createTableSql);
+
+      // Insert 30 rows with predictable composite keys
+      String upsertSql =
+        "UPSERT INTO " + fullTableName + " (PK1, PK2, COL1, COL2) VALUES (?, 
?, ?, ?)";
+      try (PreparedStatement stmt = conn.prepareStatement(upsertSql)) {
+        for (int i = 1; i <= 30; i++) {
+          // PK1: cycle through 1, 2, 3 (10 rows each)
+          // PK2: KEY_001, KEY_002, ..., KEY_030
+          int pk1 = ((i - 1) % 3) + 1; // 1, 2, 3, 1, 2, 3, ...
+          String pk2 = String.format("KEY_%03d", i);
+          stmt.setInt(1, pk1);
+          stmt.setString(2, pk2);
+          stmt.setString(3, "Value_" + i);
+          stmt.setInt(4, i * 10);
+          stmt.executeUpdate();
+        }
+      }
+      conn.commit();
+    }
+  }
+
+  @Test
+  public void testScanStartKeyOnlyWithCompositePK() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with start key using composite PK encoded bytes
+      // This should get all rows from the specified composite key onwards
+      String sql = "SELECT PK1, PK2 FROM " + fullTableName + " WHERE 
SCAN_START_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        // Create composite key bytes for PK1=2, PK2='KEY_010'
+        // This is complex as it involves Phoenix's row key encoding for 
composite keys
+        // For simplicity, we'll use a known encoded key from our test data
+        byte[] compositeKeyBytes = createCompositeKeyBytes(2, "KEY_010");
+        stmt.setBytes(1, compositeKeyBytes);
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
composite key",
+          compositeKeyBytes, scan.getStartRow());
+        assertEquals("SCAN_END_KEY not specified, so stop row should be 
empty", 0,
+          scan.getStopRow().length);
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getInt(1) + ":" + rs.getString(2));
+          }
+
+          assertEquals(17, results.size());
+          assertEquals("2:KEY_011", results.get(0));
+          assertEquals("3:KEY_030", results.get(16));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanEndKeyOnlyWithCompositePK() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with end key using composite PK
+      String sql = "SELECT PK1, PK2 FROM " + fullTableName + " WHERE 
SCAN_END_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        byte[] compositeKeyBytes = createCompositeKeyBytes(2, "KEY_020");
+        stmt.setBytes(1, compositeKeyBytes);
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertEquals("SCAN_START_KEY not specified, so start row should be 
empty", 0,
+          scan.getStartRow().length);
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to composite 
key",
+          compositeKeyBytes, scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getInt(1) + ":" + rs.getString(2));
+          }
+
+          assertEquals(16, results.size());
+          assertEquals("1:KEY_001", results.get(0));
+          assertEquals("2:KEY_017", results.get(15));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBothBoundariesWithCompositePK() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with both start and end keys using composite PK
+      String sql = "SELECT PK1, PK2 FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        byte[] startKeyBytes = createCompositeKeyBytes(1, "KEY_015");
+        byte[] endKeyBytes = createCompositeKeyBytes(3, "KEY_005");
+        stmt.setBytes(1, startKeyBytes);
+        stmt.setBytes(2, endKeyBytes);
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to first 
composite key",
+          startKeyBytes, scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to second 
composite key",
+          endKeyBytes, scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getInt(1) + ":" + rs.getString(2));
+          }
+
+          assertEquals(16, results.size());
+          assertEquals("1:KEY_016", results.get(0));
+          assertEquals("3:KEY_003", results.get(15));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanEndWithPkFilters1() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      String sql = "SELECT PK1, PK2, COL2 FROM " + fullTableName
+        + " WHERE SCAN_END_KEY() = ? AND (PK1 = ? AND PK2 > ?)";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        byte[] startKeyBytes = createCompositeKeyBytes(1, "KEY_001");
+        byte[] endKeyBytes = createCompositeKeyBytes(3, "KEY_030");
+        stmt.setBytes(1, endKeyBytes);
+        stmt.setInt(2, 2); // Only rows with PK1 = 2
+        stmt.setString(3, "KEY_005");
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        byte[] combinedKeyBytes = createCompositeKeyBytes(2, "KEY_005");
+        byte[] expectedScanStartKeyBytes = new byte[combinedKeyBytes.length + 
1];
+        System.arraycopy(combinedKeyBytes, 0, expectedScanStartKeyBytes, 0,
+          combinedKeyBytes.length);
+        expectedScanStartKeyBytes[expectedScanStartKeyBytes.length - 1] = 
(byte) 1;
+        assertArrayEquals("SCAN_START_KEY should set scan start row as PK1 
value",
+          expectedScanStartKeyBytes, scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row with 
composite PK " + "filter",
+          PInteger.INSTANCE.toBytes(3), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            int pk1 = rs.getInt(1);
+            String pk2 = rs.getString(2);
+            results.add(pk1 + ":" + pk2);
+
+            // Verify all results have PK1 = 2
+            assertEquals("All results should have PK1 = 2", 2, pk1);
+          }
+
+          assertEquals(8, results.size());
+          assertEquals("2:KEY_008", results.get(0));
+          assertEquals("2:KEY_029", results.get(7));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanEndWithPkFilters2() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      String sql =
+        "SELECT PK1, PK2, COL2 FROM " + fullTableName + " WHERE SCAN_END_KEY() 
= ? AND (PK1 > ?)";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        byte[] endKeyBytes = createCompositeKeyBytes(3, "KEY_025");
+        stmt.setBytes(1, endKeyBytes);
+        stmt.setInt(2, 2);
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row as PK1 
value",
+          PInteger.INSTANCE.toBytes(3), scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row with 
SCAN_END_KEY", endKeyBytes,
+          scan.getStopRow());
+
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            int pk1 = rs.getInt(1);
+            String pk2 = rs.getString(2);
+            results.add(pk1 + ":" + pk2);
+          }
+
+          assertEquals(8, results.size());
+          assertEquals("3:KEY_003", results.get(0));
+          assertEquals("3:KEY_024", results.get(7));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesFailWithCompositePK() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test that OR conditions with scan boundaries fail on composite PK 
table
+      String sql = "SELECT PK1, PK2 FROM " + fullTableName + " WHERE 
SCAN_START_KEY() != ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, createCompositeKeyBytes(1, "KEY_010"));
+
+        try {
+          PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+          try (ResultSet rs = stmt.executeQuery()) {
+            rs.next();
+            throw new AssertionError("Should not reach here");
+          }
+        } catch (Exception e) {
+          assertTrue("ScanStartKeyFunction should not be instantiated",
+            e.getMessage().contains("java.lang.InstantiationException: "
+              + 
"org.apache.phoenix.expression.function.ScanStartKeyFunction"));
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method to create composite key bytes for testing.
+   */
+  private byte[] createCompositeKeyBytes(int pk1, String pk2) {
+    byte[] pk1Bytes = PInteger.INSTANCE.toBytes(pk1);
+    byte[] pk2Bytes = PVarchar.INSTANCE.toBytes(pk2);
+
+    byte[] result = new byte[pk1Bytes.length + pk2Bytes.length];
+    System.arraycopy(pk1Bytes, 0, result, 0, pk1Bytes.length);
+    System.arraycopy(pk2Bytes, 0, result, pk1Bytes.length, pk2Bytes.length);
+
+    return result;
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanBoundaryFunctionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanBoundaryFunctionIT.java
new file mode 100644
index 0000000000..60b7856bc4
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ScanBoundaryFunctionIT.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(ParallelStatsDisabledTest.class)
+public class ScanBoundaryFunctionIT extends ParallelStatsDisabledIT {
+
+  private String tableName;
+  private String fullTableName;
+
+  @Before
+  public void setUp() throws Exception {
+    tableName = generateUniqueName();
+    fullTableName = "\"" + tableName + "\"";
+
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Create table with VARCHAR primary key
+      String createTableSql = "CREATE TABLE " + fullTableName + " ("
+        + "PK VARCHAR NOT NULL PRIMARY KEY, " + "COL1 VARCHAR, " + "COL2 
INTEGER" + ")";
+      conn.createStatement().execute(createTableSql);
+
+      // Insert 30 rows with predictable VARCHAR keys
+      String upsertSql = "UPSERT INTO " + fullTableName + " (PK, COL1, COL2) 
VALUES (?, ?, ?)";
+      try (PreparedStatement stmt = conn.prepareStatement(upsertSql)) {
+        for (int i = 1; i <= 30; i++) {
+          String pk = String.format("KEY_%03d", i); // KEY_001, KEY_002, ..., 
KEY_030
+          stmt.setString(1, pk);
+          stmt.setString(2, "Value_" + i);
+          stmt.setInt(3, i * 10);
+          stmt.executeUpdate();
+        }
+      }
+      conn.commit();
+    }
+  }
+
+  @Test
+  public void testScanStartKeyOnly() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with start key - should get all rows from KEY_010 onwards
+      String sql = "SELECT PK FROM " + fullTableName + " WHERE 
SCAN_START_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_010'",
+          Bytes.toBytes("KEY_010"), scan.getStartRow());
+        assertEquals("SCAN_END_KEY not specified, so stop row should be 
empty", 0,
+          scan.getStopRow().length);
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get KEY_010 through KEY_030 (21 rows)
+          assertEquals("Should return 21 rows from KEY_010 to KEY_030 
inclusive", 21,
+            results.size());
+          assertEquals("First result should be KEY_010 (inclusive start 
boundary)", "KEY_010",
+            results.get(0));
+          assertEquals("Last result should be KEY_030 (no end boundary 
specified)", "KEY_030",
+            results.get(results.size() - 1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanEndKeyOnly() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with end key - should get all rows before KEY_020 (exclusive)
+      String sql = "SELECT PK FROM " + fullTableName + " WHERE SCAN_END_KEY() 
= ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_020"));
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertEquals("SCAN_START_KEY not specified, so start row should be 
empty", 0,
+          scan.getStartRow().length);
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to 'KEY_020'",
+          Bytes.toBytes("KEY_020"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get KEY_001 through KEY_019 (19 rows)
+          assertEquals("Should return 19 rows from beginning to KEY_019 
(exclusive end at KEY_020)",
+            19, results.size());
+          assertEquals("First result should be KEY_001 (no start boundary 
specified)", "KEY_001",
+            results.get(0));
+          assertEquals("Last result should be KEY_019 (exclusive end boundary 
at KEY_020)",
+            "KEY_019", results.get(results.size() - 1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBothBoundaries() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with both start and end keys - should get rows from KEY_010 to 
KEY_020 (exclusive)
+      String sql =
+        "SELECT PK FROM " + fullTableName + " WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_020"));
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_010'",
+          Bytes.toBytes("KEY_010"), scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to 'KEY_020'",
+          Bytes.toBytes("KEY_020"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get KEY_010 through KEY_019 (10 rows)
+          assertEquals("Should return 10 rows from KEY_010 (inclusive) to 
KEY_020 (exclusive)", 10,
+            results.size());
+          assertEquals("First result should be KEY_010 (inclusive start 
boundary)", "KEY_010",
+            results.get(0));
+          assertEquals("Last result should be KEY_019 (exclusive end boundary 
at KEY_020)",
+            "KEY_019", results.get(results.size() - 1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBothBoundaries2() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with null start and non-null end key
+      String sql =
+        "SELECT PK FROM " + fullTableName + " WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, null);
+        stmt.setBytes(2, Bytes.toBytes("KEY_020"));
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertEquals("SCAN_START_KEY not specified, so start row should be 
empty", 0,
+          scan.getStartRow().length);
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to 'KEY_020'",
+          Bytes.toBytes("KEY_020"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get KEY_001 through KEY_019 (19 rows)
+          assertEquals("Should return 19 rows from KEY_001 (inclusive) to 
KEY_020 (exclusive)", 19,
+            results.size());
+          assertEquals("First result should be KEY_001 (inclusive start 
boundary)", "KEY_001",
+            results.get(0));
+          assertEquals("Last result should be KEY_019 (exclusive end boundary 
at KEY_020)",
+            "KEY_019", results.get(results.size() - 1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBothBoundaries3() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with non-null start and null end key
+      String sql =
+        "SELECT PK FROM " + fullTableName + " WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_020"));
+        stmt.setBytes(2, null);
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_020'",
+          Bytes.toBytes("KEY_020"), scan.getStartRow());
+        assertEquals("SCAN_END_KEY set to null, so stop row should be empty", 
0,
+          scan.getStopRow().length);
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get KEY_020 through KEY_030 (11 rows)
+          assertEquals("Should return 11 rows from KEY_020 (inclusive) to 
empty (exclusive)", 11,
+            results.size());
+          assertEquals("First result should be KEY_020 (inclusive start 
boundary)", "KEY_020",
+            results.get(0));
+          assertEquals("Last result should be KEY_030", "KEY_030", 
results.get(results.size() - 1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesWithAdditionalFilter() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with scan boundaries and additional filter
+      String sql = "SELECT PK, COL2 FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ? AND COL2 > ? " + 
"ORDER BY PK, COL2";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_020"));
+        stmt.setInt(3, 150); // Only rows with COL2 > 150
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_010'",
+          Bytes.toBytes("KEY_010"), scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to 'KEY_020'",
+          Bytes.toBytes("KEY_020"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          List<Integer> col2Values = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+            col2Values.add(rs.getInt(2));
+          }
+
+          // Should get KEY_016 through KEY_019 (COL2 values: 160, 170, 180, 
190)
+          assertEquals(
+            "Should return 4 rows (KEY_016-KEY_019) that satisfy scan 
boundaries and COL2 > 150", 4,
+            results.size());
+          assertEquals("First result should be KEY_016 (first row in range 
with COL2 > 150)",
+            "KEY_016", results.get(0));
+          assertEquals("Last result should be KEY_019 (last row in range with 
COL2 > 150)",
+            "KEY_019", results.get(results.size() - 1));
+
+          // Verify all COL2 values are > 150
+          for (int i = 0; i < col2Values.size(); i++) {
+            Integer value = col2Values.get(i);
+            assertTrue("COL2 value for " + results.get(i) + " should be > 150, 
but was: " + value,
+              value > 150);
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesWithLiterals() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with literal values instead of bind parameters
+      String sql = "SELECT PK FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = 'KEY_005' AND SCAN_END_KEY() = 'KEY_015'";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY literal should set scan start row to 
'KEY_005'",
+          Bytes.toBytes("KEY_005"), scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY literal should set scan stop row to 
'KEY_015'",
+          Bytes.toBytes("KEY_015"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get KEY_005 through KEY_014 (10 rows)
+          assertEquals(
+            "Should return 10 rows from KEY_005 (inclusive) to KEY_015 
(exclusive) using literals",
+            10, results.size());
+          assertEquals("First result should be KEY_005 (inclusive start 
boundary from literal)",
+            "KEY_005", results.get(0));
+          assertEquals(
+            "Last result should be KEY_014 (exclusive end boundary at KEY_015 
from literal)",
+            "KEY_014", results.get(results.size() - 1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesWithColumnFilter() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with scan boundaries and column-based filter
+      String sql = "SELECT PK, COL1 FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND COL1 LIKE 'Value_1%'";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_010'",
+          Bytes.toBytes("KEY_010"), scan.getStartRow());
+        assertEquals("SCAN_END_KEY not specified, so stop row should be 
empty", 0,
+          scan.getStopRow().length);
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          List<String> col1Values = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+            col1Values.add(rs.getString(2));
+          }
+
+          // Should get KEY_010 through KEY_019 (rows with COL1 like 
'Value_1%')
+          assertEquals("Should return 10 rows from KEY_010 onwards that match 
COL1 LIKE 'Value_1%'",
+            10, results.size());
+          assertEquals("First result should be KEY_010 (start boundary with 
matching COL1)",
+            "KEY_010", results.get(0));
+          assertEquals("Last result should be KEY_019 (last row with COL1 
starting with 'Value_1')",
+            "KEY_019", results.get(results.size() - 1));
+
+          // Verify all COL1 values match the pattern
+          for (int i = 0; i < col1Values.size(); i++) {
+            String value = col1Values.get(i);
+            assertTrue("COL1 value for " + results.get(i)
+              + " should start with 'Value_1', but was: " + value, 
value.startsWith("Value_1"));
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesEmptyResult() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with scan boundaries that should return no results
+      String sql = "SELECT PK FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ? ORDER BY PK";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_020"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_015")); // End before start - 
should be empty
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_020'",
+          Bytes.toBytes("KEY_020"), scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to 'KEY_015'",
+          Bytes.toBytes("KEY_015"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            results.add(rs.getString(1));
+          }
+
+          // Should get no results
+          assertEquals("Should return 0 rows when start key (KEY_020) > end 
key (KEY_015)", 0,
+            results.size());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesWithComplexFilter() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with scan boundaries and complex filter conditions
+      String sql = "SELECT PK, COL1, COL2 FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ? "
+        + " AND (COL2 BETWEEN ? AND ? OR COL1 = ?) ORDER BY PK";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_005"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_025"));
+        stmt.setInt(3, 100);
+        stmt.setInt(4, 120);
+        stmt.setString(5, "Value_15");
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals("SCAN_START_KEY should set scan start row to 
'KEY_005'",
+          Bytes.toBytes("KEY_005"), scan.getStartRow());
+        assertArrayEquals("SCAN_END_KEY should set scan stop row to 'KEY_025'",
+          Bytes.toBytes("KEY_025"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            String pk = rs.getString(1);
+            String col1 = rs.getString(2);
+            int col2 = rs.getInt(3);
+            results.add(pk);
+
+            // Verify the filter condition is satisfied
+            assertTrue(
+              "Filter condition not satisfied for " + pk + " (COL2=" + col2 + 
", COL1=" + col1
+                + "). Expected: (COL2 BETWEEN 100 AND 120) OR COL1='Value_15'",
+              (col2 >= 100 && col2 <= 120) || col1.equals("Value_15"));
+          }
+
+          // Should get KEY_010, KEY_011, KEY_012, KEY_015 (4 rows) in order
+          assertEquals(
+            "Should return 4 rows that satisfy complex filter within scan 
boundaries KEY_005 to KEY_025",
+            4, results.size());
+          assertEquals("First result should be KEY_010 (COL2=100, within 
BETWEEN range)", "KEY_010",
+            results.get(0));
+          assertEquals("Second result should be KEY_011 (COL2=110, within 
BETWEEN range)",
+            "KEY_011", results.get(1));
+          assertEquals("Third result should be KEY_012 (COL2=120, within 
BETWEEN range)", "KEY_012",
+            results.get(2));
+          assertEquals("Fourth result should be KEY_015 (COL1='Value_15', 
matches OR condition)",
+            "KEY_015", results.get(3));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesFail() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test with scan boundaries in OR condition - this should fail
+      String sql =
+        "SELECT PK FROM " + fullTableName + " WHERE SCAN_START_KEY() = ? OR 
SCAN_END_KEY() = ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_020"));
+
+        try {
+          PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+          try (ResultSet rs = stmt.executeQuery()) {
+            rs.next();
+            throw new AssertionError("Should not reach here");
+          }
+        } catch (Exception e) {
+          assertTrue("ScanStartKeyFunction should not be instantiated",
+            e.getMessage().contains("java.lang.InstantiationException: "
+              + 
"org.apache.phoenix.expression.function.ScanStartKeyFunction"));
+        }
+      }
+
+      sql = "SELECT PK FROM " + fullTableName + " WHERE SCAN_END_KEY() <= ?";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+        try {
+          PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+          try (ResultSet rs = stmt.executeQuery()) {
+            rs.next();
+            throw new AssertionError("Should not reach here");
+          }
+        } catch (Exception e) {
+          assertTrue("ScanEndKeyFunction should not be instantiated",
+            e.getMessage().contains("java.lang.InstantiationException: "
+              + "org.apache.phoenix.expression.function.ScanEndKeyFunction"));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testScanBoundariesMixedWithOrConditionShouldWork() throws 
Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test scan boundaries combined with other OR conditions - this should 
work
+      // The scan boundaries should be applied, and the OR condition should be 
evaluated as a filter
+      String sql = "SELECT PK, COL1, COL2 FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ? AND (COL1 = ? OR 
COL2 = ?)";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_010"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_020"));
+        stmt.setString(3, "Value_12"); // Should match KEY_012
+        stmt.setInt(4, 150); // Should match KEY_015 (but KEY_015 is outside 
our scan range)
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals(
+          "SCAN_START_KEY should set scan start row to 'KEY_010' even with OR 
in additional filters",
+          Bytes.toBytes("KEY_010"), scan.getStartRow());
+        assertArrayEquals(
+          "SCAN_END_KEY should set scan stop row to 'KEY_020' even with OR in 
additional filters",
+          Bytes.toBytes("KEY_020"), scan.getStopRow());
+
+        // Verify results
+        try (ResultSet rs = stmt.executeQuery()) {
+          List<String> results = new ArrayList<>();
+          while (rs.next()) {
+            String pk = rs.getString(1);
+            String col1 = rs.getString(2);
+            int col2 = rs.getInt(3);
+            results.add(pk);
+
+            // Verify the filter condition is satisfied within the scan range
+            assertTrue(
+              "Filter condition not satisfied for " + pk + " (COL1=" + col1 + 
", COL2=" + col2
+                + "). Expected: COL1='Value_12' OR COL2=150",
+              col1.equals("Value_12") || col2 == 150);
+          }
+
+          // Should get KEY_012 (COL1='Value_12') and KEY_015 (COL2=150) in 
order
+          assertEquals("Should return 2 rows", 2, results.size());
+          assertEquals("First result should be KEY_012 which has 
COL1='Value_12'", "KEY_012",
+            results.get(0));
+          assertEquals("Second result should be KEY_015 which has COL2=150", 
"KEY_015",
+            results.get(1));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testInvertedScanBoundaries() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      // Test scan boundaries combined with other OR conditions - this should 
work
+      // The scan boundaries should be applied, and the OR condition should be 
evaluated as a filter
+      String sql = "SELECT PK, COL1, COL2 FROM " + fullTableName
+        + " WHERE SCAN_START_KEY() = ? AND SCAN_END_KEY() = ? AND (COL1 = ? OR 
COL2 = ?)";
+      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+        stmt.setBytes(1, Bytes.toBytes("KEY_020"));
+        stmt.setBytes(2, Bytes.toBytes("KEY_010"));
+        stmt.setString(3, "Value_12");
+        stmt.setInt(4, 150);
+
+        // Verify scan boundaries are set correctly
+        PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+        QueryPlan plan = pstmt.optimizeQuery(sql);
+        Scan scan = plan.getContext().getScan();
+        assertArrayEquals(
+          "SCAN_START_KEY should set scan start row to 'KEY_020' even with OR 
in additional"
+            + " filters",
+          Bytes.toBytes("KEY_020"), scan.getStartRow());
+        assertArrayEquals(
+          "SCAN_END_KEY should set scan stop row to 'KEY_010' even with OR in 
additional "
+            + "filters",
+          Bytes.toBytes("KEY_010"), scan.getStopRow());
+
+        try (ResultSet rs = stmt.executeQuery()) {
+          assertFalse("No rows should be found", rs.next());
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TotalSegmentsFunction2IT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TotalSegmentsFunction2IT.java
new file mode 100644
index 0000000000..c98c7a91ae
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TotalSegmentsFunction2IT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Base64;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for TOTAL_SEGMENTS() with VARBINARY_ENCODED primary key
+ */
+@Category(ParallelStatsDisabledTest.class)
+public class TotalSegmentsFunction2IT extends BaseTotalSegmentsFunctionIT {
+
+  @Override
+  protected String getPrimaryKeyColumnType() {
+    return "VARBINARY_ENCODED";
+  }
+
+  @Override
+  protected String getCompositeKeyColumnType() {
+    return "VARBINARY_ENCODED";
+  }
+
+  @Override
+  protected String extractPrimaryKeyValue(ResultSet rs) throws SQLException {
+    return Base64.getEncoder().encodeToString(rs.getBytes(1));
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TotalSegmentsFunctionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TotalSegmentsFunctionIT.java
new file mode 100644
index 0000000000..22443ce966
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TotalSegmentsFunctionIT.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for TOTAL_SEGMENTS() with VARCHAR primary key
+ */
+@Category(ParallelStatsDisabledTest.class)
+public class TotalSegmentsFunctionIT extends BaseTotalSegmentsFunctionIT {
+
+  @Override
+  protected String getPrimaryKeyColumnType() {
+    return "VARCHAR";
+  }
+
+  @Override
+  protected String getCompositeKeyColumnType() {
+    return "VARCHAR";
+  }
+
+  @Override
+  protected String extractPrimaryKeyValue(ResultSet rs) throws SQLException {
+    return rs.getString(1);
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanBoundaryFunctionTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanBoundaryFunctionTest.java
new file mode 100644
index 0000000000..96323aff2f
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/ScanBoundaryFunctionTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.junit.Test;
+
+public class ScanBoundaryFunctionTest extends BaseConnectionlessQueryTest {
+
+  @Test
+  public void testScanStartKeyWithLiteral() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_START_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("startkey"));
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("startkey"), scan.getStartRow());
+  }
+
+  @Test
+  public void testScanEndKeyWithLiteral() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("endkey"));
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStopRow());
+    assertArrayEquals(Bytes.toBytes("endkey"), scan.getStopRow());
+  }
+
+  @Test
+  public void testScanBothKeysWithLiterals() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("statz--__39gf04i583130~305i-4"));
+    stmt.setBytes(2, Bytes.toBytes("##__39gf04i583130~305i-end-4"));
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStartRow());
+    assertNotNull(scan.getStopRow());
+    assertArrayEquals(Bytes.toBytes("statz--__39gf04i583130~305i-4"), 
scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("##__39gf04i583130~305i-end-4"), 
scan.getStopRow());
+  }
+
+  @Test
+  public void testScanStartKeyWithBindParameter() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_START_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("bindstart"));
+
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("bindstart"), scan.getStartRow());
+    assertArrayEquals(new byte[0], scan.getStopRow());
+  }
+
+  @Test
+  public void testScanStartKeyWithBindParameter2() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("bindstart"));
+    stmt.setBytes(2, null);
+
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("bindstart"), scan.getStartRow());
+    assertArrayEquals(new byte[0], scan.getStopRow());
+  }
+
+  @Test
+  public void testScanEndKeyWithBindParameter() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("bindend"));
+
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStopRow());
+    assertArrayEquals(Bytes.toBytes("bindend"), scan.getStopRow());
+    assertArrayEquals(new byte[0], scan.getStartRow());
+  }
+
+  @Test
+  public void testScanEndKeyWithBindParameter2() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, new byte[0]);
+    stmt.setBytes(2, Bytes.toBytes("bindend"));
+
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStopRow());
+    assertArrayEquals(Bytes.toBytes("bindend"), scan.getStopRow());
+    assertArrayEquals(new byte[0], scan.getStartRow());
+  }
+
+  @Test
+  public void testScanBothKeysWithBindParameters() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE SCAN_START_KEY() = ? AND 
SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("bindstart"));
+    stmt.setBytes(2, Bytes.toBytes("bindend"));
+
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStartRow());
+    assertNotNull(scan.getStopRow());
+    assertArrayEquals(Bytes.toBytes("bindstart"), scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("bindend"), scan.getStopRow());
+  }
+
+  @Test
+  public void testScanWithRegularWhereAndBoundary() throws Exception {
+    Properties props = new Properties();
+    PhoenixConnection conn = (PhoenixConnection) 
DriverManager.getConnection(getUrl(), props);
+
+    // This test verifies that scan boundary functions work alongside regular 
WHERE conditions
+    String sql = "SELECT * FROM SYSTEM.CATALOG WHERE TABLE_NAME = 'TEST' AND 
SCAN_END_KEY() = ?";
+    PreparedStatement stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("boundary"));
+
+    PhoenixPreparedStatement pstmt = 
stmt.unwrap(PhoenixPreparedStatement.class);
+    QueryPlan plan = pstmt.optimizeQuery(sql);
+    Scan scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStopRow());
+    assertNotNull(scan.getStartRow());
+    assertArrayEquals(Bytes.toBytesBinary("\\x00\\x00TEST"), 
scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("boundary"), scan.getStopRow());
+
+    sql =
+      "SELECT * FROM SYSTEM.CATALOG WHERE TABLE_NAME = 'TEST' AND 
SCAN_START_KEY() = ? AND SCAN_END_KEY() = ?";
+    stmt = conn.prepareStatement(sql);
+    stmt.setBytes(1, Bytes.toBytes("boundary"));
+    stmt.setBytes(2, Bytes.toBytes("boundary"));
+
+    pstmt = stmt.unwrap(PhoenixPreparedStatement.class);
+    plan = pstmt.optimizeQuery(sql);
+    scan = plan.getContext().getScan();
+
+    assertNotNull(scan.getStopRow());
+    assertNotNull(scan.getStartRow());
+    assertArrayEquals(Bytes.toBytesBinary("\\x00\\x00TEST"), 
scan.getStartRow());
+    assertArrayEquals(Bytes.toBytes("boundary"), scan.getStopRow());
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/expression/function/BuiltinFunctionConstructorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/expression/function/BuiltinFunctionConstructorTest.java
index 4a09dd058a..a7c725fccd 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/expression/function/BuiltinFunctionConstructorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/expression/function/BuiltinFunctionConstructorTest.java
@@ -63,7 +63,11 @@ public class BuiltinFunctionConstructorTest {
     ExpressionType[] types = ExpressionType.values();
     for (int i = 0; i < types.length; i++) {
       try {
-        if 
(!AggregateFunction.class.isAssignableFrom(types[i].getExpressionClass())) {
+        if (
+          
!AggregateFunction.class.isAssignableFrom(types[i].getExpressionClass())
+            && !ExpressionType.EXPRESSION_TYPES_NOT_SUPPORTED_AT_SERVER
+              .contains(types[i].getExpressionClass())
+        ) {
           Constructor cons = 
types[i].getExpressionClass().getDeclaredConstructor();
           cons.setAccessible(true);
           cons.newInstance();


Reply via email to