virajjasani commented on a change in pull request #913:
URL: https://github.com/apache/phoenix/pull/913#discussion_r539235012
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
##########
@@ -644,4 +651,33 @@ public boolean next(List<Cell> resultsToReturn) throws
IOException {
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
+
+ private void
annotateDataMutations(UngroupedAggregateRegionObserver.MutationList
mutationsList,
+ Scan scan) {
+ byte[] tenantId = null;
Review comment:
nit: we are assigning these variables representing MutationMetadataType
twice (`null` and `scan.getAttribute()`), should we assign all of them just
once by removing null assignments?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
##########
@@ -64,6 +64,7 @@
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.execute.MutationState;
Review comment:
nit: redundant?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
##########
@@ -2102,6 +2106,17 @@ private PTable createTableInternal(CreateTableStatement
statement, byte[][] spli
}
}
+ Boolean isChangeDetectionEnabledProp =
+ (Boolean)
TableProperty.CHANGE_DETECTION_ENABLED.getValue(tableProps);
+ if (isChangeDetectionEnabledProp != null) {
+ if (tableType != TABLE && tableType != VIEW) {
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
Review comment:
If possible, we can refactor this validation in a small util method and
it can be used by both workflows: createTable/createIndex and
addColumn/alterIndex?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
##########
@@ -682,6 +687,49 @@ private void generateMutations(final TableRef tableRef,
final long mutationTimes
values.putAll(modifiedValues);
}
+ public static void annotateMutationsWithMetadata(PTable table,
List<Mutation> rowMutations) {
+ //only annotate if the change detection flag is on the table and HBase
supports
+ // preWALAppend coprocs server-side
+ if (table == null || !table.isChangeDetectionEnabled()
+ || !HbaseCompatCapabilities.hasPreWALAppend()) {
+ return;
+ }
+ //annotate each mutation with enough metadata so that anyone
interested can
+ // deterministically figure out exactly what Phoenix schema object
created the mutation
+ // Server-side we can annotate the HBase WAL with these.
+ for (Mutation mutation : rowMutations) {
+ annotateMutationWithMetadata(table, mutation);
+ }
+
+ }
+
+ public static void annotateMutationWithMetadata(PTable table, Mutation
mutation) {
Review comment:
nit: we can keep both util methods `private`?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/util/WALAnnotationUtil.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.phoenix.compat.hbase.coprocessor.CompatIndexRegionObserver;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+
+import java.util.Map;
+
+/**
+ * Utility functions shared between IndexRegionObserver and GlobalIndexChecker
for annotating the
+ * HBase WAL with Phoenix-level metadata about mutations.
+ */
+public class WALAnnotationUtil {
+
+ public static void appendMutationAttributesToWALKey(WALKey key,
+ IndexRegionObserver.BatchMutateContext
context) {
+ if (context != null && context.getOriginalMutations().size() > 0) {
+ Mutation firstMutation = context.getOriginalMutations().get(0);
+ Map<String, byte[]> attrMap = firstMutation.getAttributesMap();
+ for (MutationState.MutationMetadataType metadataType :
+ MutationState.MutationMetadataType.values()) {
+ String metadataTypeKey = metadataType.toString();
+ if (attrMap.containsKey(metadataTypeKey)) {
+ CompatIndexRegionObserver.appendToWALKey(key,
metadataTypeKey,
Review comment:
`appendToWALKey()` is public only in 1.5 compact class but not in
1.3/1.4 compact classes, without which this could cause compilation issue while
running against 1.3/1.4.
##########
File path:
phoenix-hbase-compat-1.3.0/src/main/java/org/apache/phoenix/compat/hbase/coprocessor/CompatIndexRegionObserver.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.compat.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CompatIndexRegionObserver extends BaseRegionObserver {
+ public static String PHOENIX_APPEND_METADATA_TO_WAL =
"phoenix.append.metadata.to.wal";
+ public static boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
Review comment:
nit: should we avoid defining constants in Compat classes?
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
##########
@@ -1160,6 +1169,12 @@ private PTable getTable(RegionScanner scanner, long
clientTimeStamp, long tableT
null :
PLong.INSTANCE.getCodec().decodeLong(lastDDLTimestampKv.getValueArray(),
lastDDLTimestampKv.getValueOffset(), SortOrder.getDefault());
+ Cell changeDetectionEnabledKv =
tableKeyValues[CHANGE_DETECTION_ENABLED_INDEX];
+ boolean isChangeDetectionEnabled = changeDetectionEnabledKv != null
Review comment:
In case of old client - new server case, `changeDetectionEnabledKv` will
be null right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]