xiangfu0 commented on code in PR #17939:
URL: https://github.com/apache/pinot/pull/17939#discussion_r3004670125


##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java:
##########
@@ -121,6 +164,139 @@ public BrokerResponse 
executeDMLStatement(SqlNodeAndOptions sqlNodeAndOptions,
     return result;
   }
 
+  private InsertResult executePushInsert(DataManipulationStatement statement,
+      @Nullable Map<String, String> headers)
+      throws Exception {
+    if (!(statement instanceof InsertIntoValues)) {
+      throw new IllegalStateException("PUSH execution type requires 
InsertIntoValues statement");
+    }
+    InsertIntoValues insertStmt = (InsertIntoValues) statement;
+
+    InsertRequest request = buildInsertRequest(insertStmt);
+
+    // If a local executor is available (controller-side), call it directly
+    InsertExecutor localExecutor = _insertExecutor;
+    if (localExecutor != null) {
+      LOGGER.info("Executing push-based INSERT locally via InsertExecutor");
+      return localExecutor.execute(request);
+    }
+
+    // Otherwise, POST to controller /insert/execute (broker-side)
+    String controllerBaseUrl = getControllerUrl();
+    String url = controllerBaseUrl + "/insert/execute";
+    String payload = JsonUtils.objectToString(request);
+
+    LOGGER.info("Submitting push-based INSERT to controller: {}", url);
+
+    Map<String, String> requestHeaders = new HashMap<>();
+    requestHeaders.put("Content-Type", "application/json");
+    requestHeaders.put("accept", "application/json");
+    if (headers != null) {
+      requestHeaders.putAll(headers);
+    }
+
+    String responseStr =
+        
org.apache.pinot.common.utils.http.HttpClient.wrapAndThrowHttpException(
+            org.apache.pinot.common.utils.http.HttpClient.getInstance()
+                .sendJsonPostRequest(new java.net.URL(url).toURI(), payload, 
requestHeaders))
+            .getResponse();
+    JsonNode responseJson = JsonUtils.stringToJsonNode(responseStr);
+    return JsonUtils.jsonNodeToObject(responseJson, InsertResult.class);
+  }
+
+  private static InsertRequest buildInsertRequest(InsertIntoValues insertStmt) 
{
+    List<GenericRow> genericRows = new ArrayList<>();
+    List<String> columns = insertStmt.getColumns();
+    for (List<Object> rowValues : insertStmt.getRows()) {
+      GenericRow genericRow = new GenericRow();
+      if (columns.isEmpty()) {
+        throw new IllegalStateException(
+            "INSERT INTO ... VALUES requires an explicit column list, "
+                + "e.g. INSERT INTO myTable (col1, col2) VALUES (1, 'a')");
+      }
+      for (int i = 0; i < columns.size(); i++) {
+        genericRow.putValue(columns.get(i), rowValues.get(i));
+      }
+      genericRows.add(genericRow);
+    }
+
+    TableType tableType = null;
+    String tableTypeStr = insertStmt.getTableType();
+    if (tableTypeStr != null) {
+      tableType = TableType.valueOf(tableTypeStr.toUpperCase());
+    }
+
+    // Compute a stable payload hash for idempotency when requestId is set
+    String payloadHash = computePayloadHash(insertStmt.getTableName(), 
columns, insertStmt.getRows(),
+        insertStmt.getOptions());
+
+    return new InsertRequest.Builder()
+        .setTableName(insertStmt.getTableName())
+        .setTableType(tableType)
+        .setInsertType(InsertType.ROW)
+        .setRows(genericRows)
+        .setRequestId(insertStmt.getRequestId())
+        .setPayloadHash(payloadHash)
+        .setOptions(insertStmt.getOptions())
+        .build();
+  }
+
+  /**
+   * Computes a stable SHA-256 hash from the table name, column list, row 
values, and options.
+   * This ensures that identical INSERT statements produce the same hash for 
idempotency.
+   */
+  private static String computePayloadHash(String tableName, List<String> 
columns,
+      List<List<Object>> rows, Map<String, String> options) {
+    try {
+      MessageDigest digest = MessageDigest.getInstance("SHA-256");
+      digest.update(tableName.getBytes(StandardCharsets.UTF_8));
+      digest.update((byte) 0);
+
+      for (String col : columns) {
+        digest.update(col.getBytes(StandardCharsets.UTF_8));
+        digest.update((byte) 0);
+      }
+      digest.update((byte) 1);
+
+      for (List<Object> row : rows) {
+        for (Object val : row) {
+          digest.update(String.valueOf(val).getBytes(StandardCharsets.UTF_8));
+          digest.update((byte) 0);
+        }
+        digest.update((byte) 2);
+      }
+
+      if (options != null && !options.isEmpty()) {
+        // Sort keys for deterministic ordering
+        for (Map.Entry<String, String> entry : new 
TreeMap<>(options).entrySet()) {
+          digest.update(entry.getKey().getBytes(StandardCharsets.UTF_8));
+          digest.update((byte) 0);
+          digest.update(entry.getValue().getBytes(StandardCharsets.UTF_8));
+          digest.update((byte) 0);
+        }
+      }

Review Comment:
   Already addressed in 86966875b1. `computePayloadHash()` explicitly removes 
`requestId` and `tableType` from the options map before hashing, so only 
data-payload options are included.



##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java:
##########
@@ -121,6 +164,139 @@ public BrokerResponse 
executeDMLStatement(SqlNodeAndOptions sqlNodeAndOptions,
     return result;
   }
 
+  private InsertResult executePushInsert(DataManipulationStatement statement,
+      @Nullable Map<String, String> headers)
+      throws Exception {
+    if (!(statement instanceof InsertIntoValues)) {
+      throw new IllegalStateException("PUSH execution type requires 
InsertIntoValues statement");
+    }
+    InsertIntoValues insertStmt = (InsertIntoValues) statement;
+
+    InsertRequest request = buildInsertRequest(insertStmt);
+
+    // If a local executor is available (controller-side), call it directly
+    InsertExecutor localExecutor = _insertExecutor;
+    if (localExecutor != null) {
+      LOGGER.info("Executing push-based INSERT locally via InsertExecutor");
+      return localExecutor.execute(request);
+    }
+
+    // Otherwise, POST to controller /insert/execute (broker-side)
+    String controllerBaseUrl = getControllerUrl();
+    String url = controllerBaseUrl + "/insert/execute";
+    String payload = JsonUtils.objectToString(request);
+
+    LOGGER.info("Submitting push-based INSERT to controller: {}", url);
+
+    Map<String, String> requestHeaders = new HashMap<>();
+    requestHeaders.put("Content-Type", "application/json");
+    requestHeaders.put("accept", "application/json");
+    if (headers != null) {
+      requestHeaders.putAll(headers);
+    }
+
+    String responseStr =
+        
org.apache.pinot.common.utils.http.HttpClient.wrapAndThrowHttpException(
+            org.apache.pinot.common.utils.http.HttpClient.getInstance()
+                .sendJsonPostRequest(new java.net.URL(url).toURI(), payload, 
requestHeaders))
+            .getResponse();
+    JsonNode responseJson = JsonUtils.stringToJsonNode(responseStr);
+    return JsonUtils.jsonNodeToObject(responseJson, InsertResult.class);
+  }
+
+  private static InsertRequest buildInsertRequest(InsertIntoValues insertStmt) 
{
+    List<GenericRow> genericRows = new ArrayList<>();
+    List<String> columns = insertStmt.getColumns();
+    for (List<Object> rowValues : insertStmt.getRows()) {
+      GenericRow genericRow = new GenericRow();
+      if (columns.isEmpty()) {
+        throw new IllegalStateException(
+            "INSERT INTO ... VALUES requires an explicit column list, "
+                + "e.g. INSERT INTO myTable (col1, col2) VALUES (1, 'a')");
+      }

Review Comment:
   Fixed in 6321aa593b. Column-less INSERT is now rejected at parse time with a 
clear error. Documentation updated to reflect that an explicit column list is 
required.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/InsertPartitionRouter.java:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Routes rows to partitions for INSERT INTO operations.
+ *
+ * <p>For upsert/dedup tables, rows are routed by the primary key column using 
the table's
+ * configured partition function. For append-only tables, rows are distributed 
via round-robin
+ * across the configured number of partitions.
+ *
+ * <p>This class validates that upsert/dedup tables have proper partition 
configuration and
+ * rejects inserts if the configuration is missing.
+ *
+ * <p>This class is thread-safe.
+ */
+public class InsertPartitionRouter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertPartitionRouter.class);
+

Review Comment:
   Already fixed in 86966875b1. The unused LOGGER was removed from 
`InsertPartitionRouter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to