[ 
https://issues.apache.org/jira/browse/PHOENIX-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503893#comment-17503893
 ] 

ASF GitHub Bot commented on PHOENIX-6501:
-----------------------------------------

tkhurana commented on a change in pull request #1399:
URL: https://github.com/apache/phoenix/pull/1399#discussion_r823155600



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.util.ScanUtil.getDummyResult;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.hbase.index.parallel.*;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an index table region scanner which scans index table rows locally 
and then extracts
+ * data table row keys from them. Using the data table row keys, the data 
table rows are scanned
+ * using the HBase client available to region servers.
+ */
+public class UncoveredGlobalIndexRegionScanner extends BaseRegionScanner {
+    public static final String NUM_CONCURRENT_INDEX_THREADS_CONF_KEY = 
"index.threads.max";
+    public static final int DEFAULT_CONCURRENT_INDEX_THREADS = 16;
+    public static final String INDEX_ROW_COUNTS_PER_TASK_CONF_KEY = 
"index.row.count.per.task";
+    public static final int DEFAULT_INDEX_ROW_COUNTS_PER_TASK = 2048;
+
+    /**
+     * The states of the processing a page of index rows
+     */
+    private enum State {
+        INITIAL, SCANNING_INDEX, SCANNING_DATA, SCANNING_DATA_INTERRUPTED, 
READY
+    }
+    State state = State.INITIAL;
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(UncoveredGlobalIndexRegionScanner.class);
+    protected final byte[][] viewConstants;
+    protected final RegionCoprocessorEnvironment env;
+    protected byte[][] regionEndKeys;
+    protected final Table dataHTable;
+    protected final int pageSizeInRows;
+    protected final int rowCountPerTask;
+    protected final Scan scan;
+    protected final Scan dataTableScan;
+    protected final RegionScanner innerScanner;
+    protected final Region region;
+    protected final IndexMaintainer indexMaintainer;
+    protected final TupleProjector tupleProjector;
+    protected final ImmutableBytesWritable ptr;
+    protected final TaskRunner pool;
+    protected String exceptionMessage;
+    protected final HTableFactory hTableFactory;
+    protected List<List<Cell>> indexRows = null;
+    protected Map<ImmutableBytesPtr, Result> dataRows = null;
+    protected Iterator<List<Cell>> indexRowIterator = null;
+    protected Map<byte[], byte[]> indexToDataRowKeyMap = null;
+    protected int indexRowCount = 0;
+    protected final long pageSizeMs;
+    protected byte[] lastIndexRowKey = null;
+
+    public UncoveredGlobalIndexRegionScanner(final RegionScanner innerScanner,
+                                             final Region region,
+                                             final Scan scan,
+                                             final 
RegionCoprocessorEnvironment env,
+                                             final Scan dataTableScan,
+                                             final TupleProjector 
tupleProjector,
+                                             final IndexMaintainer 
indexMaintainer,
+                                             final byte[][] viewConstants,
+                                             final ImmutableBytesWritable ptr,
+                                             final long pageSizeMs)
+            throws IOException {
+        super(innerScanner);
+        final Configuration config = env.getConfiguration();
+
+        byte[] pageSizeFromScan =
+                scan.getAttribute(BaseScannerRegionObserver.INDEX_PAGE_ROWS);
+        if (pageSizeFromScan != null) {
+            pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+        } else {
+            pageSizeInRows = (int)
+                    config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
+                            
QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+        }
+
+        this.indexMaintainer = indexMaintainer;
+        this.viewConstants = viewConstants;
+        this.scan = scan;
+        this.dataTableScan = dataTableScan;
+        this.innerScanner = innerScanner;
+        this.region = region;
+        this.env = env;
+        this.ptr = ptr;
+        this.tupleProjector = tupleProjector;
+        this.pageSizeMs = pageSizeMs;
+        hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
+        rowCountPerTask = config.getInt(INDEX_ROW_COUNTS_PER_TASK_CONF_KEY,
+                DEFAULT_INDEX_ROW_COUNTS_PER_TASK);
+
+        pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                new ThreadPoolBuilder("Uncovered Global Index",
+                        
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_THREADS_CONF_KEY,
+                        DEFAULT_CONCURRENT_INDEX_THREADS).setCoreTimeout(
+                        INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+        byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+        dataHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(dataTableName));
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                     
HBaseFactoryProvider.getHConnectionFactory().createConnection(
+                             env.getConfiguration())) {
+            regionEndKeys = 
connection.getRegionLocator(dataHTable.getName()).getEndKeys();
+        }
+    }
+
+    @Override
+    public long getMvccReadPoint() {
+        return innerScanner.getMvccReadPoint();
+    }
+    @Override
+    public RegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        hTableFactory.shutdown();
+        if (dataHTable != null) {
+            dataHTable.close();
+        }
+        this.pool.stop("UncoveredGlobalIndexRegionScanner is closing");
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return innerScanner.getMaxResultSize();
+    }
+
+    @Override
+    public int getBatch() {
+        return innerScanner.getBatch();
+    }
+
+    private void scanDataRows(Set<byte[]> dataRowKeys, long startTime) throws 
IOException {
+        List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
+        for (byte[] dataRowKey: dataRowKeys) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey));
+        }
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan dataScan = new Scan(dataTableScan);
+        dataScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
+        scanRanges.initializeScan(dataScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+        try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
+            for (Result result = resultScanner.next(); (result != null);
+                 result = resultScanner.next()) {
+                dataRows.put(new ImmutableBytesPtr(result.getRow()), result);
+                if ((EnvironmentEdgeManager.currentTimeMillis() - startTime) 
>= pageSizeMs) {
+                    LOGGER.info("One of the scan tasks in 
UncoveredGlobalIndexRegionScanner"
+                            + " for region " + 
region.getRegionInfo().getRegionNameAsString()
+                            + " could not complete on time (in " + pageSizeMs+ 
" ms) and"
+                            + " will be resubmitted");
+                    state = State.SCANNING_DATA_INTERRUPTED;
+                    break;
+                }
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+        }
+    }
+
+    private void addTasksForScanningDataTableRowsInParallel(TaskBatch<Boolean> 
tasks,
+                                                            final Set<byte[]> 
dataRowKeys,
+                                                            final long 
startTime) {
+        tasks.add(new Task<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                try {
+                    //in HBase 1.x we could check if the coproc environment 
was closed or aborted,
+                    //but in HBase 2.x the coproc environment can't check 
region server services
+                    if (Thread.currentThread().isInterrupted()) {
+                        exceptionMessage = "Pool closed, not retrieving data 
table rows for "
+                                + 
region.getRegionInfo().getRegionNameAsString();
+                        throw new IOException(exceptionMessage);
+                    }
+                    scanDataRows(dataRowKeys, startTime);
+                } catch (Exception e) {
+                    throw e;
+                }
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+
+    protected void submitTasks(TaskBatch<Boolean> tasks) throws IOException {
+        Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
+        try {
+            LOGGER.debug("Waiting on index tasks to complete...");
+            resultsAndFutures = this.pool.submitUninterruptible(tasks);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(
+                    "Should not fail on the results while using a 
WaitForCompletionTaskRunner", e);
+        } catch (EarlyExitFailure e) {
+            throw new RuntimeException("Stopped while waiting for batch, 
quitting!", e);
+        }
+        int index = 0;
+        for (Boolean result : resultsAndFutures.getFirst()) {
+            if (result == null) {
+                Throwable cause = ServerUtil.getExceptionFromFailedFuture(
+                        resultsAndFutures.getSecond().get(index));
+                // there was a failure
+                throw new IOException(exceptionMessage, cause);

Review comment:
       exceptionMessage might not be set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Use batching when joining data table rows with uncovered global index rows
> --------------------------------------------------------------------------
>
>                 Key: PHOENIX-6501
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6501
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.1.2
>            Reporter: Kadir Ozdemir
>            Assignee: Kadir OZDEMIR
>            Priority: Major
>         Attachments: PHOENIX-6501.master.001.patch
>
>
> PHOENIX-6458 extends the existing uncovered local index support for global 
> indexes. The current solution uses HBase get operations to join data table 
> rows with uncovered index rows on the server side. Doing a separate RPC call 
> for every data table row can be expensive. Instead, we can buffer lots of 
> data row keys in memory,  use a skip scan filter and even multiple threads to 
> issue a separate scan for each data table region in parallel. This will 
> reduce the cost of join and also improve the performance.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to