This is an automated email from the ASF dual-hosted git repository.
tdsilva pushed a commit to branch 4.x-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push:
new defb57a PHOENIX-5073 modify index state based on client version to
support old clients
defb57a is described below
commit defb57a09a50c3038421688213a10754fcb2897d
Author: kiran.maturi <[email protected]>
AuthorDate: Fri Dec 28 14:46:19 2018 +0530
PHOENIX-5073 modify index state based on client version to support old
clients
---
.../index/InvalidIndexStateClientSideIT.java | 145 +++++++++++++++++++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 41 ++++--
2 files changed, 175 insertions(+), 11 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/InvalidIndexStateClientSideIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/InvalidIndexStateClientSideIT.java
new file mode 100644
index 0000000..7052ade
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/InvalidIndexStateClientSideIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
+import static
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
+import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+
+public class InvalidIndexStateClientSideIT extends ParallelStatsDisabledIT {
+ private static final Log LOG =
LogFactory.getLog(InvalidIndexStateClientSideIT.class);
+
+ @Test
+ public void testCachedConnections() throws Throwable {
+ final String schemaName = generateUniqueName();
+ final String tableName = generateUniqueName();
+ final String fullTableName = SchemaUtil.getTableName(schemaName,
tableName);
+ final String indexName = generateUniqueName();
+ final String fullIndexName = SchemaUtil.getTableName(schemaName,
indexName);
+ final Connection conn = DriverManager.getConnection(getUrl());
+
+ // create table and indices
+ String createTableSql =
+ "CREATE TABLE " + fullTableName
+ + "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER,
v2 INTEGER, v3 INTEGER)";
+ conn.createStatement().execute(createTableSql);
+ conn.createStatement()
+ .execute("CREATE INDEX " + indexName + " ON " + fullTableName
+ "(v1)");
+ conn.commit();
+ PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
+ ConnectionQueryServices queryServices = phoenixConn.getQueryServices();
+ HTableInterface metaTable =
+ phoenixConn.getQueryServices()
+
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ long ts = EnvironmentEdgeManager.currentTimeMillis();
+ MutationCode code =
+ IndexUtil
+ .updateIndexState(fullIndexName, ts, metaTable,
PIndexState.PENDING_DISABLE)
+ .getMutationCode();
+ assertEquals(MutationCode.TABLE_ALREADY_EXISTS, code);
+ ts = EnvironmentEdgeManager.currentTimeMillis();
+
+ final byte[] schemaBytes = PVarchar.INSTANCE.toBytes(schemaName);
+ final byte[] tableBytes = PVarchar.INSTANCE.toBytes(tableName);
+ PName tenantId = phoenixConn.getTenantId();
+ final long tableTimestamp = HConstants.LATEST_TIMESTAMP;
+ long tableResolvedTimestamp = HConstants.LATEST_TIMESTAMP;
+ final long resolvedTimestamp = tableResolvedTimestamp;
+ final byte[] tenantIdBytes =
+ tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY :
tenantId.getBytes();
+ byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaBytes,
tableBytes);
+ Batch.Call<MetaDataService, MetaDataResponse> callable =
+ new Batch.Call<MetaDataService, MetaDataResponse>() {
+ @Override
+ public MetaDataResponse call(MetaDataService instance)
throws IOException {
+ ServerRpcController controller = new
ServerRpcController();
+ BlockingRpcCallback<MetaDataResponse> rpcCallback =
+ new BlockingRpcCallback<MetaDataResponse>();
+ GetTableRequest.Builder builder =
GetTableRequest.newBuilder();
+ builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+ builder.setSchemaName(ByteStringer.wrap(schemaBytes));
+ builder.setTableName(ByteStringer.wrap(tableBytes));
+ builder.setTableTimestamp(tableTimestamp);
+ builder.setClientTimestamp(resolvedTimestamp);
+
builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
+ 13, PHOENIX_PATCH_NUMBER));
+ builder.setSkipAddingParentColumns(false);
+ builder.setSkipAddingIndexes(false);
+ instance.getTable(controller, builder.build(),
rpcCallback);
+ if (controller.getFailedOn() != null) {
+ throw controller.getFailedOn();
+ }
+ return rpcCallback.get();
+ }
+ };
+ int version = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, 13,
PHOENIX_PATCH_NUMBER);
+ LOG.info("Client version: " + version);
+ HTableInterface ht =
+
queryServices.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ try {
+ final Map<byte[], MetaDataResponse> results =
+ ht.coprocessorService(MetaDataService.class, tableKey,
tableKey, callable);
+
+ assert (results.size() == 1);
+ MetaDataResponse result = results.values().iterator().next();
+ assert (result.getTable().getIndexesCount() == 1);
+ assert
(PIndexState.valueOf(result.getTable().getIndexes(0).getIndexState())
+ .equals(PIndexState.DISABLE));
+ } catch (Exception e) {
+ System.out.println("Exception Occurred: " + e);
+
+ } finally {
+ Closeables.closeQuietly(ht);
+ }
+
+ }
+
+}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index cd9efee..799a41b 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -1403,17 +1403,7 @@ public class MetaDataEndpointImpl extends
MetaDataProtocol implements Coprocesso
PIndexState indexState =
indexStateKv == null ? null :
PIndexState.fromSerializedValue(indexStateKv
.getValueArray()[indexStateKv.getValueOffset()]);
- // If client is not yet up to 4.12, then translate PENDING_ACTIVE to
ACTIVE (as would have been
- // the value in those versions) since the client won't have this index
state in its enum.
- if (indexState == PIndexState.PENDING_ACTIVE && clientVersion <
MetaDataProtocol.MIN_PENDING_ACTIVE_INDEX) {
- indexState = PIndexState.ACTIVE;
- }
- // If client is not yet up to 4.14, then translate PENDING_DISABLE to
DISABLE
- // since the client won't have this index state in its enum.
- if (indexState == PIndexState.PENDING_DISABLE && clientVersion <
MetaDataProtocol.MIN_PENDING_DISABLE_INDEX) {
- // note: for older clients, we have to rely on the rebuilder to
transition PENDING_DISABLE -> DISABLE
- indexState = PIndexState.DISABLE;
- }
+
Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
boolean isImmutableRows =
immutableRowsKv == null ? false : (Boolean)
PBoolean.INSTANCE.toObject(
@@ -1596,6 +1586,34 @@ public class MetaDataEndpointImpl extends
MetaDataProtocol implements Coprocesso
decodeViewIndexId(viewIndexIdKv, viewIndexIdType);
}
+ private PTable modifyIndexStateForOldClient(int clientVersion, PTable
table)
+ throws SQLException {
+ if (table == null) {
+ return table;
+ }
+ // PHOENIX-5073 Sets the index state based on the client version in
case of old clients.
+ // If client is not yet up to 4.12, then translate PENDING_ACTIVE to
ACTIVE (as would have
+ // been the value in those versions) since the client won't have this
index state in its
+ // enum.
+ if (table.getIndexState() == PIndexState.PENDING_ACTIVE
+ && clientVersion < MetaDataProtocol.MIN_PENDING_ACTIVE_INDEX) {
+ table =
+ PTableImpl.builderWithColumns(table,
PTableImpl.getColumnsToClone(table))
+ .setState(PIndexState.ACTIVE).build();
+ }
+ // If client is not yet up to 4.14, then translate PENDING_DISABLE to
DISABLE
+ // since the client won't have this index state in its enum.
+ if (table.getIndexState() == PIndexState.PENDING_DISABLE
+ && clientVersion < MetaDataProtocol.MIN_PENDING_DISABLE_INDEX)
{
+ // note: for older clients, we have to rely on the rebuilder to
transition
+ // PENDING_DISABLE -> DISABLE
+ table =
+ PTableImpl.builderWithColumns(table,
PTableImpl.getColumnsToClone(table))
+ .setState(PIndexState.DISABLE).build();
+ }
+ return table;
+ }
+
/**
* Returns viewIndexId based on its underlying data type
*
@@ -3675,6 +3693,7 @@ public class MetaDataEndpointImpl extends
MetaDataProtocol implements Coprocesso
PTable table =
getTableFromCache(cacheKey, clientTimeStamp,
clientVersion, skipAddingIndexes,
skipAddingParentColumns, lockedAncestorTable);
+ table = modifyIndexStateForOldClient(clientVersion, table);
// We only cache the latest, so we'll end up building the table
with every call if the
// client connection has specified an SCN.
// TODO: If we indicate to the client that we're returning an
older version, but there's a