virajjasani commented on code in PR #2209:
URL: https://github.com/apache/phoenix/pull/2209#discussion_r2191000315


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java:
##########
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+/**
+ * Utility class for CDC (Change Data Capture) operations during compaction.
+ * This class contains utilities for handling TTL row expiration events and 
generating
+ * CDC events with pre-image data that are written directly to CDC index 
tables.
+ */
+public final class CDCCompactionUtil {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
+
+    private CDCCompactionUtil() {
+        // empty
+    }
+
+    /**
+     * Finds the column name for a given cell in the data table.
+     *
+     * @param dataTable The data table
+     * @param cell      The cell
+     * @return The column name or null if not found
+     */
+    static String findColumnName(PTable dataTable, Cell cell) {
+        try {
+            byte[] family = CellUtil.cloneFamily(cell);
+            byte[] qualifier = CellUtil.cloneQualifier(cell);
+            byte[] defaultCf = dataTable.getDefaultFamilyName() != null
+                    ? dataTable.getDefaultFamilyName().getBytes()
+                    : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+            for (PColumn column : dataTable.getColumns()) {
+                if (column.getFamilyName() != null
+                        && Bytes.equals(family, 
column.getFamilyName().getBytes())
+                        && Bytes.equals(qualifier, 
column.getColumnQualifierBytes())) {
+                    if (Bytes.equals(defaultCf, 
column.getFamilyName().getBytes())) {
+                        return column.getName().getString();
+                    } else {
+                        return column.getFamilyName().getString() + 
NAME_SEPARATOR
+                                + column.getName().getString();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Error finding column name for cell: {}", 
CellUtil.toString(cell, true),
+                    e);
+        }
+        return null;
+    }
+
+    /**
+     * Creates a CDC event map for TTL delete with pre-image data.
+     *
+     * @param expiredRowPut The expired row data
+     * @param dataTable     The data table
+     * @param preImage      Pre-image map
+     * @return CDC event map
+     */
+    static Map<String, Object> createTTLDeleteCDCEvent(Put expiredRowPut, 
PTable dataTable,
+                                                       Map<String, Object> 
preImage)
+            throws Exception {
+        Map<String, Object> cdcEvent = new HashMap<>();
+        cdcEvent.put(QueryConstants.CDC_EVENT_TYPE, 
QueryConstants.CDC_TTL_DELETE_EVENT_TYPE);
+        for (List<Cell> familyCells : 
expiredRowPut.getFamilyCellMap().values()) {
+            for (Cell cell : familyCells) {
+                String columnName = findColumnName(dataTable, cell);
+                if (columnName != null) {
+                    PColumn column = dataTable.getColumnForColumnQualifier(
+                            CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell));
+                    Object value = 
column.getDataType().toObject(cell.getValueArray(),
+                            cell.getValueOffset(),
+                            cell.getValueLength());
+                    Object encodedValue =
+                            CDCUtil.getColumnEncodedValue(value, 
column.getDataType());
+                    preImage.put(columnName, encodedValue);
+                }
+            }
+        }
+        cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
+        cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());
+        return cdcEvent;
+    }

Review Comment:
   Also, given the increasing complexity of CompactionScanner, we need to keep 
the workflow as simple as possible. In the worst case, if the mutation to the 
CDC index fails after configured attempts, we still need to not block the 
compaction because the cost could be very high depending on the size of the 
rows, tables, regions and stores.
   
   That's another reason i wanted to keep the workflow as simple as:
   - Generate mutation and perform CheckAndMutate (success for single CF case)
   - If CheckAndMutate fails, merge the image (multi-CF case, simple enough 
because the latest mutation will be retrieved from the region memstore quickly)
   - If CheckAndMutate fails due to UPSERT event right at the exact timestamp, 
ignore CDC event for TTL because even though TTL is expired, the new UPSERT 
happened at the exact moment so upsert event makes precedence (extreme rare 
event)
   
   I found the above logic at one place (because TTL_DELETE is special event) 
in static utility more manageable than making changes to CDCTableInfo, and 
creating new objects of CDCTableInfo and CDCChangeBuilder for each TTL expired 
row. In current case, we are not even creating new object of CDCCompactionUtil.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to