This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 7cb3162c2a Revert "PHOENIX-6776 :- Abort scans of closed connections
at ScanningResultIterator (#1517)"
7cb3162c2a is described below
commit 7cb3162c2a10238468e0b49153a6cf0c348989e1
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Mon Nov 21 11:45:29 2022 -0800
Revert "PHOENIX-6776 :- Abort scans of closed connections at
ScanningResultIterator (#1517)"
This reverts commit 24887be9fb7d7d97b3c9db52882075a6d8452f45.
---
.../end2end/PreMatureTimelyAbortScanIt.java | 89 ----------------------
.../coprocessor/BaseScannerRegionObserver.java | 2 +-
.../apache/phoenix/exception/SQLExceptionCode.java | 5 +-
.../phoenix/iterate/BaseResultIterators.java | 3 -
.../phoenix/iterate/ScanningResultIterator.java | 26 +------
.../phoenix/iterate/TableResultIterator.java | 2 +-
.../iterate/TableSnapshotResultIterator.java | 7 +-
.../org/apache/phoenix/jdbc/PhoenixConnection.java | 23 +-----
.../phoenix/mapreduce/PhoenixRecordReader.java | 2 +-
9 files changed, 11 insertions(+), 148 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreMatureTimelyAbortScanIt.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreMatureTimelyAbortScanIt.java
deleted file mode 100644
index c0729707be..0000000000
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PreMatureTimelyAbortScanIt.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.iterate.ScanningResultIterator;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-@Category(ParallelStatsDisabledTest.class)
-public class PreMatureTimelyAbortScanIt extends ParallelStatsDisabledIT {
- private static final Logger LOG =
LoggerFactory.getLogger(PreMatureTimelyAbortScanIt.class);
-
- @BeforeClass
- public static synchronized void doSetup() throws Exception {
- Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
- props.put(BaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(60*60)); // An hour
- props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(false));
- props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS,
Integer.toString(0));
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
- }
-
- private String getUniqueUrl() {
- return url + generateUniqueName();
- }
-
- @Test
- public void testPreMatureScannerAbortForCount() throws Exception {
-
- try (Connection conn = DriverManager.getConnection(getUniqueUrl())) {
- conn.createStatement().execute("CREATE TABLE LONG_BUG (ID INTEGER
PRIMARY KEY, AMOUNT DECIMAL) SALT_BUCKETS = 16 ");
- }
- try (Connection conn = DriverManager.getConnection(getUniqueUrl())) {
- for (int i = 0; i<100 ; i++) {
- int amount = -50000 + i;
- String s = "UPSERT INTO LONG_BUG (ID, AMOUNT) VALUES( " + i +
", " + amount + ")";
- conn.createStatement().execute(s);
- }
- conn.commit();
- }
-
- try {
- PhoenixConnection conn =
DriverManager.getConnection(getUniqueUrl()).unwrap(PhoenixConnection.class);
- ScanningResultIterator.setIsScannerClosedForcefully(true);
- ResultSet resultSet = conn.createStatement().executeQuery(
- "SELECT COUNT(*) FROM LONG_BUG WHERE ID % 2 = 0");
- conn.setIsClosing(true);
- resultSet.next();
- LOG.info("Count of modulus 2 for LONG_BUG :- " +
resultSet.getInt(1));
- fail("ResultSet should have been closed");
- } catch (SQLException sqe) {
-
assertEquals(SQLExceptionCode.FAILED_KNOWINGLY_FOR_TEST.getErrorCode(),
sqe.getErrorCode());
- } catch (Exception e) {
- fail();
- } finally {
- ScanningResultIterator.setIsScannerClosedForcefully(false);
- }
- }
-}
\ No newline at end of file
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index ab7119bfbc..9f47169fc3 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -262,7 +262,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
// last possible moment. You need to swap the start/stop and make
the
// start exclusive and the stop inclusive.
ScanUtil.setupReverseScan(scan);
- if (!(scan.getFilter() instanceof PagedFilter)) {
+ if (scan.getFilter() != null && !(scan.getFilter() instanceof
PagedFilter)) {
byte[] pageSizeMsBytes =
scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
if (pageSizeMsBytes != null) {
scan.setFilter(new PagedFilter(scan.getFilter(),
getPageSizeMsForFilter(scan)));
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 771b06a42d..63b7ba0904 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -583,10 +583,7 @@ public enum SQLExceptionCode {
CANNOT_TRANSFORM_TABLE_WITH_APPEND_ONLY_SCHEMA(913, "43M24", "Cannot
transform a table with append-only schema."),
- CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a
transactional table."),
-
- //SQLCode for testing exceptions
- FAILED_KNOWINGLY_FOR_TEST(7777, "TEST", "Exception was thrown to test
something");
+ CANNOT_TRANSFORM_TRANSACTIONAL_TABLE(914, "43M25", "Cannot transform a
transactional table.");
private final int errorCode;
private final String sqlState;
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 7dfcba334a..cbbb5554ad 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -51,7 +51,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -1416,8 +1415,6 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
}
}
- } catch (CancellationException ce) {
- LOGGER.warn("Iterator scheduled to be executed in
Future was being cancelled", ce);
}
}
addIterator(iterators, concatIterators);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index 5b8da7edcc..8b30882325 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -53,33 +53,22 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.phoenix.compile.ExplainPlanAttributes
.ExplainPlanAttributesBuilder;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.util.ServerUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ScanningResultIterator implements ResultIterator {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ScanningResultIterator.class);
private final ResultScanner scanner;
private final ScanMetricsHolder scanMetricsHolder;
boolean scanMetricsUpdated;
boolean scanMetricsEnabled;
- private StatementContext context;
- private static boolean throwExceptionIfScannerClosedForceFully = false;
- public ScanningResultIterator(ResultScanner scanner, Scan scan,
ScanMetricsHolder scanMetricsHolder, StatementContext context) {
+ public ScanningResultIterator(ResultScanner scanner, Scan scan,
ScanMetricsHolder scanMetricsHolder) {
this.scanner = scanner;
this.scanMetricsHolder = scanMetricsHolder;
- this.context = context;
scanMetricsUpdated = false;
scanMetricsEnabled = scan.isScanMetricsEnabled();
}
@@ -170,14 +159,6 @@ public class ScanningResultIterator implements
ResultIterator {
try {
Result result = scanner.next();
while (result != null && (result.isEmpty() || isDummy(result))) {
- if (context.getConnection().isClosing() ||
context.getConnection().isClosed()) {
- LOG.warn("Closing ResultScanner as Connection is already
closed or in middle of closing");
- if (throwExceptionIfScannerClosedForceFully) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.FAILED_KNOWINGLY_FOR_TEST).build().buildException();
- }
- close();
- return null;
- }
result = scanner.next();
}
if (result == null) {
@@ -209,9 +190,4 @@ public class ScanningResultIterator implements
ResultIterator {
public ResultScanner getScanner() {
return scanner;
}
-
- @VisibleForTesting
- public static void setIsScannerClosedForcefully(boolean throwException) {
- throwExceptionIfScannerClosedForceFully = throwException;
- }
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index bc67f80cbf..9342ddc818 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -237,7 +237,7 @@ public class TableResultIterator implements ResultIterator {
if (delegate == UNINITIALIZED_SCANNER) {
try {
this.scanIterator =
- new
ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder,
plan.getContext());
+ new
ScanningResultIterator(htable.getScanner(scan), scan, scanMetricsHolder);
} catch (IOException e) {
Closeables.closeQuietly(htable);
throw ServerUtil.parseServerException(e);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index 6bde117e71..e4e9bef910 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -38,7 +38,6 @@ import
org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
-import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -75,14 +74,12 @@ public class TableSnapshotResultIterator implements
ResultIterator {
private FileSystem fs;
private int currentRegion;
private boolean closed = false;
- private StatementContext context;
- public TableSnapshotResultIterator(Configuration configuration, Scan scan,
ScanMetricsHolder scanMetricsHolder, StatementContext context)
+ public TableSnapshotResultIterator(Configuration configuration, Scan scan,
ScanMetricsHolder scanMetricsHolder)
throws IOException {
this.configuration = configuration;
this.currentRegion = -1;
this.scan = scan;
- this.context = context;
this.scanMetricsHolder = scanMetricsHolder;
this.scanIterator = UNINITIALIZED_SCANNER;
if (PhoenixConfigurationUtil.isMRSnapshotManagedExternally(configuration))
{
@@ -156,7 +153,7 @@ public class TableSnapshotResultIterator implements
ResultIterator {
RegionInfo hri = regions.get(this.currentRegion);
this.scanIterator =
new ScanningResultIterator(new SnapshotScanner(configuration, fs,
restoreDir, htd, hri, scan),
- scan, scanMetricsHolder, context);
+ scan, scanMetricsHolder);
} catch (Throwable e) {
throw ServerUtil.parseServerException(e);
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index f9b1aae654..774b0ce07d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -165,7 +165,6 @@ public class PhoenixConnection implements MetaDataMutated,
SQLCloseable, Phoenix
private int statementExecutionCounter;
private TraceScope traceScope = null;
private volatile boolean isClosed = false;
- private volatile boolean isClosing = false;
private Sampler<?> sampler;
private boolean readOnly = false;
private Consistency consistency = Consistency.STRONG;
@@ -701,7 +700,7 @@ public class PhoenixConnection implements MetaDataMutated,
SQLCloseable, Phoenix
}
void checkOpen() throws SQLException {
- if (isClosed || isClosing) {
+ if (isClosed) {
throw reasonForClose != null
? reasonForClose
: new
SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED)
@@ -720,7 +719,7 @@ public class PhoenixConnection implements MetaDataMutated,
SQLCloseable, Phoenix
* @see #close()
*/
public void close(SQLException reasonForClose) throws SQLException {
- if (isClosed || isClosing) {
+ if (isClosed) {
return;
}
this.reasonForClose = reasonForClose;
@@ -732,12 +731,11 @@ public class PhoenixConnection implements
MetaDataMutated, SQLCloseable, Phoenix
//Does this need to be synchronized?
@Override
synchronized public void close() throws SQLException {
- if (isClosed || isClosing) {
+ if (isClosed) {
return;
}
try {
- isClosing = true;
TableMetricsManager.pushMetricsFromConnInstanceMethod(getMutationMetrics());
if(!(reasonForClose instanceof FailoverSQLException)) {
// If the reason for close is because of failover, the metrics
will be kept for
@@ -759,7 +757,6 @@ public class PhoenixConnection implements MetaDataMutated,
SQLCloseable, Phoenix
}
} finally {
- isClosing = false;
isClosed = true;
if(isInternalConnection()){
GLOBAL_OPEN_INTERNAL_PHOENIX_CONNECTIONS.decrement();
@@ -968,10 +965,6 @@ public class PhoenixConnection implements MetaDataMutated,
SQLCloseable, Phoenix
return isClosed;
}
- public boolean isClosing() throws SQLException {
- return isClosing;
- }
-
@Override
public boolean isReadOnly() throws SQLException {
return readOnly;
@@ -980,7 +973,7 @@ public class PhoenixConnection implements MetaDataMutated,
SQLCloseable, Phoenix
@Override
public boolean isValid(int timeout) throws SQLException {
// TODO: run query here or ping
- return !isClosed && !isClosing;
+ return !isClosed;
}
@Override
@@ -1381,14 +1374,6 @@ public class PhoenixConnection implements
MetaDataMutated, SQLCloseable, Phoenix
this.tableResultIteratorFactory = factory;
}
- /**
- * Added for testing purposes. Do not use this elsewhere.
- */
- @VisibleForTesting
- public void setIsClosing(boolean imitateIsClosing) {
- isClosing = imitateIsClosing;
- }
-
@Override
public void removeSchema(PSchema schema, long schemaTimeStamp) {
metaData.removeSchema(schema, schemaTimeStamp);
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index d18bcffac1..91d3da40b4 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -138,7 +138,7 @@ public class PhoenixRecordReader<T extends DBWritable>
extends RecordReader<Null
if (snapshotName != null) {
// result iterator to read snapshots
final TableSnapshotResultIterator
tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration,
scan,
- scanMetricsHolder, queryPlan.getContext());
+ scanMetricsHolder);
peekingResultIterator =
LookAheadResultIterator.wrap(tableSnapshotResultIterator);
LOGGER.info("Adding TableSnapshotResultIterator for scan:
" + scan);
} else {