Copilot commented on code in PR #17939:
URL: https://github.com/apache/pinot/pull/17939#discussion_r2998612747
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java:
##########
@@ -121,6 +164,139 @@ public BrokerResponse
executeDMLStatement(SqlNodeAndOptions sqlNodeAndOptions,
return result;
}
+ private InsertResult executePushInsert(DataManipulationStatement statement,
+ @Nullable Map<String, String> headers)
+ throws Exception {
+ if (!(statement instanceof InsertIntoValues)) {
+ throw new IllegalStateException("PUSH execution type requires
InsertIntoValues statement");
+ }
+ InsertIntoValues insertStmt = (InsertIntoValues) statement;
+
+ InsertRequest request = buildInsertRequest(insertStmt);
+
+ // If a local executor is available (controller-side), call it directly
+ InsertExecutor localExecutor = _insertExecutor;
+ if (localExecutor != null) {
+ LOGGER.info("Executing push-based INSERT locally via InsertExecutor");
+ return localExecutor.execute(request);
+ }
+
+ // Otherwise, POST to controller /insert/execute (broker-side)
+ String controllerBaseUrl = getControllerUrl();
+ String url = controllerBaseUrl + "/insert/execute";
+ String payload = JsonUtils.objectToString(request);
+
+ LOGGER.info("Submitting push-based INSERT to controller: {}", url);
+
+ Map<String, String> requestHeaders = new HashMap<>();
+ requestHeaders.put("Content-Type", "application/json");
+ requestHeaders.put("accept", "application/json");
+ if (headers != null) {
+ requestHeaders.putAll(headers);
+ }
+
+ String responseStr =
+
org.apache.pinot.common.utils.http.HttpClient.wrapAndThrowHttpException(
+ org.apache.pinot.common.utils.http.HttpClient.getInstance()
+ .sendJsonPostRequest(new java.net.URL(url).toURI(), payload,
requestHeaders))
+ .getResponse();
+ JsonNode responseJson = JsonUtils.stringToJsonNode(responseStr);
+ return JsonUtils.jsonNodeToObject(responseJson, InsertResult.class);
+ }
+
+ private static InsertRequest buildInsertRequest(InsertIntoValues insertStmt)
{
+ List<GenericRow> genericRows = new ArrayList<>();
+ List<String> columns = insertStmt.getColumns();
+ for (List<Object> rowValues : insertStmt.getRows()) {
+ GenericRow genericRow = new GenericRow();
+ if (columns.isEmpty()) {
+ throw new IllegalStateException(
+ "INSERT INTO ... VALUES requires an explicit column list, "
+ + "e.g. INSERT INTO myTable (col1, col2) VALUES (1, 'a')");
+ }
+ for (int i = 0; i < columns.size(); i++) {
+ genericRow.putValue(columns.get(i), rowValues.get(i));
+ }
+ genericRows.add(genericRow);
+ }
+
+ TableType tableType = null;
+ String tableTypeStr = insertStmt.getTableType();
+ if (tableTypeStr != null) {
+ tableType = TableType.valueOf(tableTypeStr.toUpperCase());
+ }
+
+ // Compute a stable payload hash for idempotency when requestId is set
+ String payloadHash = computePayloadHash(insertStmt.getTableName(),
columns, insertStmt.getRows(),
+ insertStmt.getOptions());
+
+ return new InsertRequest.Builder()
+ .setTableName(insertStmt.getTableName())
+ .setTableType(tableType)
+ .setInsertType(InsertType.ROW)
+ .setRows(genericRows)
+ .setRequestId(insertStmt.getRequestId())
+ .setPayloadHash(payloadHash)
+ .setOptions(insertStmt.getOptions())
+ .build();
+ }
+
+ /**
+ * Computes a stable SHA-256 hash from the table name, column list, row
values, and options.
+ * This ensures that identical INSERT statements produce the same hash for
idempotency.
+ */
+ private static String computePayloadHash(String tableName, List<String>
columns,
+ List<List<Object>> rows, Map<String, String> options) {
+ try {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(tableName.getBytes(StandardCharsets.UTF_8));
+ digest.update((byte) 0);
+
+ for (String col : columns) {
+ digest.update(col.getBytes(StandardCharsets.UTF_8));
+ digest.update((byte) 0);
+ }
+ digest.update((byte) 1);
+
+ for (List<Object> row : rows) {
+ for (Object val : row) {
+ digest.update(String.valueOf(val).getBytes(StandardCharsets.UTF_8));
+ digest.update((byte) 0);
+ }
+ digest.update((byte) 2);
+ }
+
+ if (options != null && !options.isEmpty()) {
+ // Sort keys for deterministic ordering
+ for (Map.Entry<String, String> entry : new
TreeMap<>(options).entrySet()) {
+ digest.update(entry.getKey().getBytes(StandardCharsets.UTF_8));
+ digest.update((byte) 0);
+ digest.update(entry.getValue().getBytes(StandardCharsets.UTF_8));
+ digest.update((byte) 0);
+ }
+ }
Review Comment:
`computePayloadHash(...)` currently hashes all `options`, which (for SQL)
includes `requestId` and possibly other non-payload settings. This can cause a
false `IDEMPOTENCY_CONFLICT` if the same `requestId` is retried via another
client that computes `payloadHash` over only the data payload. Consider
explicitly excluding `requestId` (and other idempotency/control options) from
the hash, and define the payload-hash contract as only table+columns+values
(and maybe tableType).
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/InsertPartitionRouter.java:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.partition.PartitionFunction;
+import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Routes rows to partitions for INSERT INTO operations.
+ *
+ * <p>For upsert/dedup tables, rows are routed by the primary key column using
the table's
+ * configured partition function. For append-only tables, rows are distributed
via round-robin
+ * across the configured number of partitions.
+ *
+ * <p>This class validates that upsert/dedup tables have proper partition
configuration and
+ * rejects inserts if the configuration is missing.
+ *
+ * <p>This class is thread-safe.
+ */
+public class InsertPartitionRouter {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InsertPartitionRouter.class);
+
Review Comment:
`InsertPartitionRouter` declares a `LOGGER` that is never used. Please
remove it (and any now-unused imports) to avoid checkstyle/static-analysis
failures and keep the class minimal.
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/sql/SqlQueryExecutor.java:
##########
@@ -121,6 +164,139 @@ public BrokerResponse
executeDMLStatement(SqlNodeAndOptions sqlNodeAndOptions,
return result;
}
+ private InsertResult executePushInsert(DataManipulationStatement statement,
+ @Nullable Map<String, String> headers)
+ throws Exception {
+ if (!(statement instanceof InsertIntoValues)) {
+ throw new IllegalStateException("PUSH execution type requires
InsertIntoValues statement");
+ }
+ InsertIntoValues insertStmt = (InsertIntoValues) statement;
+
+ InsertRequest request = buildInsertRequest(insertStmt);
+
+ // If a local executor is available (controller-side), call it directly
+ InsertExecutor localExecutor = _insertExecutor;
+ if (localExecutor != null) {
+ LOGGER.info("Executing push-based INSERT locally via InsertExecutor");
+ return localExecutor.execute(request);
+ }
+
+ // Otherwise, POST to controller /insert/execute (broker-side)
+ String controllerBaseUrl = getControllerUrl();
+ String url = controllerBaseUrl + "/insert/execute";
+ String payload = JsonUtils.objectToString(request);
+
+ LOGGER.info("Submitting push-based INSERT to controller: {}", url);
+
+ Map<String, String> requestHeaders = new HashMap<>();
+ requestHeaders.put("Content-Type", "application/json");
+ requestHeaders.put("accept", "application/json");
+ if (headers != null) {
+ requestHeaders.putAll(headers);
+ }
+
+ String responseStr =
+
org.apache.pinot.common.utils.http.HttpClient.wrapAndThrowHttpException(
+ org.apache.pinot.common.utils.http.HttpClient.getInstance()
+ .sendJsonPostRequest(new java.net.URL(url).toURI(), payload,
requestHeaders))
+ .getResponse();
+ JsonNode responseJson = JsonUtils.stringToJsonNode(responseStr);
+ return JsonUtils.jsonNodeToObject(responseJson, InsertResult.class);
+ }
+
+ private static InsertRequest buildInsertRequest(InsertIntoValues insertStmt)
{
+ List<GenericRow> genericRows = new ArrayList<>();
+ List<String> columns = insertStmt.getColumns();
+ for (List<Object> rowValues : insertStmt.getRows()) {
+ GenericRow genericRow = new GenericRow();
+ if (columns.isEmpty()) {
+ throw new IllegalStateException(
+ "INSERT INTO ... VALUES requires an explicit column list, "
+ + "e.g. INSERT INTO myTable (col1, col2) VALUES (1, 'a')");
+ }
Review Comment:
This code rejects `INSERT INTO ... VALUES` when the column list is omitted,
but the PR description/tutorial claims positional inserts without a column list
are supported. Either update the documentation/PR description to match the
behavior, or implement positional-to-schema mapping so `INSERT INTO t VALUES
(...)` works as documented.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]