xiangfu0 commented on code in PR #17939:
URL: https://github.com/apache/pinot/pull/17939#discussion_r3069108264


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/InsertRowApplier.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingest.PreparedStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Applies committed insert data to Pinot's mutable segment path.
+ *
+ * <p>On commit, reads prepared batches from the {@link PreparedStore} and 
provides them for
+ * indexing into mutable segments. Tracks applied statement IDs in a 
persistent file to enable
+ * idempotent replay after server restart.
+ *
+ * <p>The applied-statement tracking file is stored at
+ * {@code <dataDir>/insert/applied_statements.log}.
+ *
+ * <p>This class is thread-safe for concurrent apply operations on different 
statements. Concurrent
+ * apply on the same statement is not expected.
+ */
+public class InsertRowApplier {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertRowApplier.class);
+  private static final String APPLIED_STATEMENTS_FILE = 
"applied_statements.log";
+
+  private final PreparedStore _preparedStore;
+  private final File _appliedStatementsFile;
+  private final Set<String> _appliedStatements;
+  private final Object _fileLock = new Object();
+
+  /**
+   * Creates a new insert row applier.
+   *
+   * @param preparedStore the prepared store to read batches from
+   * @param dataDir the data directory for storing the applied-statements 
tracking file
+   */
+  public InsertRowApplier(PreparedStore preparedStore, File dataDir) {
+    _preparedStore = preparedStore;
+    File insertDir = new File(dataDir, "insert");
+    if (!insertDir.exists() && !insertDir.mkdirs()) {
+      throw new RuntimeException("Failed to create insert directory: " + 
insertDir);
+    }
+    _appliedStatementsFile = new File(insertDir, APPLIED_STATEMENTS_FILE);
+    _appliedStatements = loadAppliedStatements();
+  }
+
+  /**
+   * Applies the committed data for the given statement and partition. Returns 
the deserialized
+   * rows that should be indexed into the mutable segment.
+   *
+   * <p>This method is idempotent: if the statement has already been applied 
(tracked in the
+   * applied-statements file), it returns {@code null} to indicate no action 
is needed.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   * @return the list of rows to index, or {@code null} if already applied
+   * @throws IOException if reading or deserializing the prepared data fails
+   */
+  public List<GenericRow> apply(String statementId, int partitionId, long 
sequenceNo)
+      throws IOException {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+
+    if (_appliedStatements.contains(applyKey)) {
+      LOGGER.info("Statement {} partition {} seq {} already applied, 
skipping", statementId, partitionId, sequenceNo);
+      return null;
+    }
+
+    byte[] data = _preparedStore.load(statementId, partitionId, sequenceNo);
+    if (data == null) {
+      LOGGER.warn("No prepared data found for statement {} partition {} seq 
{}", statementId, partitionId, sequenceNo);
+      return null;
+    }
+
+    List<GenericRow> rows = GenericRowSerializer.deserializeRows(data);
+    LOGGER.info("Applied {} rows for statement {} partition {} seq {}", 
rows.size(), statementId, partitionId,
+        sequenceNo);
+
+    // Record that this batch has been applied
+    markApplied(applyKey);
+
+    return rows;
+  }
+
+  /**
+   * Returns whether a given statement/partition/sequence has already been 
applied.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   * @return {@code true} if already applied
+   */
+  public boolean isApplied(String statementId, int partitionId, long 
sequenceNo) {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+    return _appliedStatements.contains(applyKey);
+  }
+
+  private void markApplied(String applyKey) {
+    synchronized (_fileLock) {
+      _appliedStatements.add(applyKey);
+      try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(_appliedStatementsFile, true))) {
+        writer.write(applyKey);
+        writer.newLine();
+        writer.flush();
+      } catch (IOException e) {

Review Comment:
   Addressed in `e805c72443`. The applied-statements log now writes via 
`Files.newBufferedWriter(..., UTF_8)` instead of the platform-default 
`FileWriter`.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/InsertRowApplier.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingest.PreparedStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Applies committed insert data to Pinot's mutable segment path.
+ *
+ * <p>On commit, reads prepared batches from the {@link PreparedStore} and 
provides them for
+ * indexing into mutable segments. Tracks applied statement IDs in a 
persistent file to enable
+ * idempotent replay after server restart.
+ *
+ * <p>The applied-statement tracking file is stored at
+ * {@code <dataDir>/insert/applied_statements.log}.
+ *
+ * <p>This class is thread-safe for concurrent apply operations on different 
statements. Concurrent
+ * apply on the same statement is not expected.
+ */
+public class InsertRowApplier {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertRowApplier.class);
+  private static final String APPLIED_STATEMENTS_FILE = 
"applied_statements.log";
+
+  private final PreparedStore _preparedStore;
+  private final File _appliedStatementsFile;
+  private final Set<String> _appliedStatements;
+  private final Object _fileLock = new Object();
+
+  /**
+   * Creates a new insert row applier.
+   *
+   * @param preparedStore the prepared store to read batches from
+   * @param dataDir the data directory for storing the applied-statements 
tracking file
+   */
+  public InsertRowApplier(PreparedStore preparedStore, File dataDir) {
+    _preparedStore = preparedStore;
+    File insertDir = new File(dataDir, "insert");
+    if (!insertDir.exists() && !insertDir.mkdirs()) {
+      throw new RuntimeException("Failed to create insert directory: " + 
insertDir);
+    }
+    _appliedStatementsFile = new File(insertDir, APPLIED_STATEMENTS_FILE);
+    _appliedStatements = loadAppliedStatements();
+  }
+
+  /**
+   * Applies the committed data for the given statement and partition. Returns 
the deserialized
+   * rows that should be indexed into the mutable segment.
+   *
+   * <p>This method is idempotent: if the statement has already been applied 
(tracked in the
+   * applied-statements file), it returns {@code null} to indicate no action 
is needed.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   * @return the list of rows to index, or {@code null} if already applied
+   * @throws IOException if reading or deserializing the prepared data fails
+   */
+  public List<GenericRow> apply(String statementId, int partitionId, long 
sequenceNo)
+      throws IOException {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+
+    if (_appliedStatements.contains(applyKey)) {
+      LOGGER.info("Statement {} partition {} seq {} already applied, 
skipping", statementId, partitionId, sequenceNo);
+      return null;
+    }
+
+    byte[] data = _preparedStore.load(statementId, partitionId, sequenceNo);
+    if (data == null) {
+      LOGGER.warn("No prepared data found for statement {} partition {} seq 
{}", statementId, partitionId, sequenceNo);
+      return null;
+    }
+
+    List<GenericRow> rows = GenericRowSerializer.deserializeRows(data);
+    LOGGER.info("Applied {} rows for statement {} partition {} seq {}", 
rows.size(), statementId, partitionId,
+        sequenceNo);
+
+    // Record that this batch has been applied
+    markApplied(applyKey);
+
+    return rows;
+  }
+
+  /**
+   * Returns whether a given statement/partition/sequence has already been 
applied.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   * @return {@code true} if already applied
+   */
+  public boolean isApplied(String statementId, int partitionId, long 
sequenceNo) {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+    return _appliedStatements.contains(applyKey);
+  }
+
+  private void markApplied(String applyKey) {
+    synchronized (_fileLock) {
+      _appliedStatements.add(applyKey);
+      try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(_appliedStatementsFile, true))) {
+        writer.write(applyKey);
+        writer.newLine();
+        writer.flush();
+      } catch (IOException e) {
+        LOGGER.error("Failed to record applied statement: {}", applyKey, e);
+      }
+    }
+  }
+
+  /**
+   * Removes tracking entries for the given statement from the in-memory set 
and rewrites the
+   * applied-statements file. Called during GC to prevent the tracking file 
from growing without
+   * bound.
+   *
+   * @param statementId the statement identifier to remove tracking entries for
+   */
+  public void cleanAppliedEntries(String statementId) {
+    String prefix = statementId + ":";
+    synchronized (_fileLock) {
+      boolean removed = _appliedStatements.removeIf(key -> 
key.startsWith(prefix));
+      if (removed) {
+        rewriteAppliedStatementsFile();
+        LOGGER.info("Cleaned applied tracking entries for statement: {}", 
statementId);
+      }
+    }
+  }
+
+  /**
+   * Rewrites the applied-statements file from the current in-memory set.
+   * Must be called under {@code _fileLock}.
+   */
+  private void rewriteAppliedStatementsFile() {
+    try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(_appliedStatementsFile, false))) {
+      for (String key : _appliedStatements) {
+        writer.write(key);
+        writer.newLine();
+      }
+      writer.flush();
+    } catch (IOException e) {
+      LOGGER.error("Failed to rewrite applied statements file: {}", 
_appliedStatementsFile, e);
+    }
+  }
+
+  private Set<String> loadAppliedStatements() {
+    Set<String> applied = new HashSet<>();
+    if (!_appliedStatementsFile.exists()) {
+      return applied;
+    }
+    try (BufferedReader reader = new BufferedReader(new 
FileReader(_appliedStatementsFile))) {
+      String line;

Review Comment:
   Addressed in `e805c72443`. The replay path now reads the applied-statements 
log with `Files.newBufferedReader(..., UTF_8)` to match the write side.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/ingest/FileInsertArtifactPublisher.java:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.ingest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingest.ArtifactPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages staging and publishing of segment artifacts produced by file-based 
INSERT INTO statements.
+ *
+ * <p>Artifacts are staged in a temporary namespace within the deep store 
(under a
+ * {@code _insert_staging/<statementId>/} prefix). On commit, they are moved 
to their final
+ * location. On abort, staged artifacts are cleaned up.
+ *
+ * <p>This publisher delegates to the configured {@link PinotFS} 
implementation for the deep store
+ * scheme.
+ *
+ * <p>This class is thread-safe for concurrent operations across different 
statements.
+ */
+public class FileInsertArtifactPublisher implements ArtifactPublisher {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileInsertArtifactPublisher.class);
+
+  static final String STAGING_DIR_PREFIX = "_insert_staging";
+  static final String DEEP_STORE_URI_KEY = "controller.data.dir";
+
+  private URI _deepStoreBaseUri;
+
+  /**
+   * Tracks which statements have staged artifacts so cleanup can enumerate 
them.
+   */
+  private final ConcurrentHashMap<String, Set<URI>> _stagedArtifacts = new 
ConcurrentHashMap<>();
+
+  @Override
+  public void init(PinotConfiguration config) {
+    String deepStoreUri = config.getProperty(DEEP_STORE_URI_KEY);
+    if (deepStoreUri != null) {
+      _deepStoreBaseUri = URI.create(deepStoreUri);
+    }
+  }

Review Comment:
   Addressed in `e805c72443`. `controller.data.dir` is now normalized through 
`URIUtils.getUri(...)`, so scheme-less local paths resolve to a schemeful URI 
before `PinotFSFactory.create(...)`.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/RowInsertExecutor.java:
##########
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingest.InsertExecutor;
+import org.apache.pinot.spi.ingest.InsertRequest;
+import org.apache.pinot.spi.ingest.InsertResult;
+import org.apache.pinot.spi.ingest.InsertStatementState;
+import org.apache.pinot.spi.ingest.PreparedStore;
+import org.apache.pinot.spi.ingest.ShardLog;
+import org.apache.pinot.spi.ingest.ShardLogProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Server-side INSERT INTO executor using a local durable log and prepared 
store.
+ *
+ * <p><strong>Note (v1 limitation):</strong> This executor requires table 
configs to be registered
+ * via {@link RowInsertExecutorFactory#registerTableConfig}. In v1, the 
primary INSERT path uses
+ * {@code ControllerRowInsertExecutor} which resolves table configs from 
ZooKeeper. This server-side
+ * executor is intended for a future direct-server-ingestion path where the 
server's
+ * {@code InstanceDataManager} will register table configs as tables are 
loaded.
+ *
+ * <p>The execution flow for each request:
+ * <ol>
+ *   <li>Validate the request (table exists, rows present, etc.)</li>
+ *   <li>Route rows to target partition(s) based on partition function</li>
+ *   <li>Write rows to the {@link ShardLog} for each partition</li>
+ *   <li>Write prepared batch to the {@link PreparedStore}</li>
+ *   <li>Return {@link InsertResult} with state={@link 
InsertStatementState#PREPARED}</li>
+ * </ol>
+ *
+ * <p>This class is thread-safe; a single instance may be invoked concurrently 
from multiple
+ * request threads.
+ */
+public class RowInsertExecutor implements InsertExecutor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RowInsertExecutor.class);
+
+  private final ShardLogProvider _shardLogProvider;
+  private final PreparedStore _preparedStore;
+  private final ConcurrentHashMap<String, InsertStatementState> 
_statementStates = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, TableConfig> _tableConfigs;
+
+  /**
+   * Creates a new row insert executor.
+   *
+   * @param shardLogProvider the provider for shard logs
+   * @param preparedStore the prepared store for staging data
+   * @param tableConfigs a map of table name to table config for validation 
and partition routing
+   */
+  public RowInsertExecutor(ShardLogProvider shardLogProvider, PreparedStore 
preparedStore,
+      ConcurrentHashMap<String, TableConfig> tableConfigs) {
+    _shardLogProvider = shardLogProvider;
+    _preparedStore = preparedStore;
+    _tableConfigs = tableConfigs;
+  }
+
+  @Override
+  public InsertResult execute(InsertRequest request) {
+    String statementId = request.getStatementId();
+    String tableName = request.getTableName();
+    List<GenericRow> rows = request.getRows();
+
+    LOGGER.info("Executing INSERT for statement {} on table {} with {} rows", 
statementId, tableName, rows.size());
+
+    // Validate the request
+    if (tableName == null || tableName.isEmpty()) {
+      return buildErrorResult(statementId, "Table name is required", 
"INVALID_TABLE");
+    }
+    if (rows == null || rows.isEmpty()) {
+      return buildErrorResult(statementId, "At least one row is required for 
ROW insert", "EMPTY_ROWS");
+    }
+
+    TableConfig tableConfig = _tableConfigs.get(tableName);
+    if (tableConfig == null) {
+      return buildErrorResult(statementId, "Table not found: " + tableName, 
"TABLE_NOT_FOUND");
+    }
+
+    // Create partition router
+    InsertPartitionRouter router;
+    try {
+      router = new InsertPartitionRouter(tableConfig);
+    } catch (IllegalArgumentException e) {
+      return buildErrorResult(statementId, e.getMessage(), 
"PARTITION_CONFIG_ERROR");
+    }
+
+    // Route rows to partitions
+    Map<Integer, List<GenericRow>> partitionedRows = router.routeRows(rows);
+
+    _statementStates.put(statementId, InsertStatementState.ACCEPTED);
+
+    // Write rows to shard log and prepared store for each partition
+    try {
+      for (Map.Entry<Integer, List<GenericRow>> entry : 
partitionedRows.entrySet()) {
+        int partitionId = entry.getKey();
+        List<GenericRow> partitionRows = entry.getValue();
+
+        ShardLog shardLog = _shardLogProvider.getShardLog(tableName, 
partitionId);
+
+        // Serialize and append rows to the shard log
+        byte[] serializedRows = 
GenericRowSerializer.serializeRows(partitionRows);
+        long startOffset = shardLog.append(serializedRows);
+        long endOffset = startOffset;  // Single append, so start == end
+
+        // Also write to prepared store for durability
+        _preparedStore.store(statementId, partitionId, 0, serializedRows);
+
+        // Mark the offset range as prepared in the shard log
+        shardLog.prepare(statementId, startOffset, endOffset);
+      }
+
+      _statementStates.put(statementId, InsertStatementState.PREPARED);
+
+      LOGGER.info("INSERT statement {} prepared with {} partition(s)", 
statementId, partitionedRows.size());
+
+      return new InsertResult.Builder()
+          .setStatementId(statementId)
+          .setState(InsertStatementState.PREPARED)
+          .setMessage("Insert prepared with " + rows.size() + " rows across "
+              + partitionedRows.size() + " partition(s)")
+          .build();
+    } catch (Exception e) {
+      LOGGER.error("Failed to execute INSERT for statement {}", statementId, 
e);
+      _statementStates.put(statementId, InsertStatementState.ABORTED);
+      return buildErrorResult(statementId, "Failed to write rows: " + 
e.getMessage(), "WRITE_ERROR");
+    }
+  }
+
+  @Override
+  public InsertResult getStatus(String statementId) {
+    InsertStatementState state = _statementStates.get(statementId);
+    if (state == null) {
+      return new InsertResult.Builder()
+          .setStatementId(statementId)
+          .setState(InsertStatementState.ABORTED)
+          .setMessage("Unknown statement: " + statementId)
+          .build();
+    }
+    return new InsertResult.Builder()
+        .setStatementId(statementId)
+        .setState(state)
+        .build();
+  }
+
+  @Override
+  public InsertResult abort(String statementId) {
+    InsertStatementState currentState = _statementStates.get(statementId);
+    if (currentState == null) {
+      return new InsertResult.Builder()
+          .setStatementId(statementId)
+          .setState(InsertStatementState.ABORTED)
+          .setMessage("Unknown statement: " + statementId)
+          .build();
+    }
+
+    _statementStates.put(statementId, InsertStatementState.ABORTED);
+    _preparedStore.cleanup(statementId);
+

Review Comment:
   Addressed in `e805c72443`. `RowInsertExecutor` now tracks the prepared 
partitions per statement, and `abort()` rolls back both shard-log state and 
prepared-store data for those partitions.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/LocalShardLog.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.spi.ingest.ShardLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * File-based append-only implementation of {@link ShardLog}.
+ *
+ * <p>Data is written as length-prefixed entries to a log file:
+ * <pre>
+ *   [4 bytes: entry length][N bytes: entry data]
+ * </pre>
+ *
+ * <p>The offset returned by {@link #append(byte[])} is the byte offset in the 
log file where the
+ * entry begins. Statement metadata (prepared/committed/aborted state and 
offset ranges) is tracked
+ * in a separate metadata file.
+ *
+ * <p>This implementation is thread-safe for concurrent appends via a 
read-write lock.
+ * Prepare/commit/abort operations for a given statement are expected to be 
serialized externally.
+ */
+public class LocalShardLog implements ShardLog {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalShardLog.class);
+  private static final String LOG_FILE_NAME = "data.log";
+  private static final String META_FILE_NAME = "meta.log";
+
+  private final File _logDir;
+  private final File _logFile;
+  private final File _metaFile;
+  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  private final AtomicLong _writeOffset = new AtomicLong(0);
+  private final Map<String, StatementMeta> _statementMetas = new 
ConcurrentHashMap<>();
+
+  /**
+   * Creates a new local shard log under the given directory.
+   *
+   * @param logDir the directory for this shard log's files
+   * @throws IOException if the directory cannot be created or the log file 
cannot be initialized
+   */
+  public LocalShardLog(File logDir)
+      throws IOException {
+    _logDir = logDir;
+    if (!_logDir.exists() && !_logDir.mkdirs()) {
+      throw new IOException("Failed to create log directory: " + _logDir);
+    }
+    _logFile = new File(_logDir, LOG_FILE_NAME);
+    _metaFile = new File(_logDir, META_FILE_NAME);
+
+    if (_logFile.exists()) {
+      _writeOffset.set(_logFile.length());
+    }
+
+    // Recover statement metadata from the meta file
+    recoverMetadata();
+  }
+
+  @Override
+  public long append(byte[] data) {
+    _lock.writeLock().lock();
+    try {
+      long offset = _writeOffset.get();
+      try (FileOutputStream fos = new FileOutputStream(_logFile, true);
+           FileChannel channel = fos.getChannel()) {
+        ByteBuffer header = ByteBuffer.allocate(4);
+        header.putInt(data.length);
+        header.flip();
+        channel.write(header);
+        channel.write(ByteBuffer.wrap(data));
+        channel.force(true);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to append to shard log: " + 
_logFile, e);
+      }
+      _writeOffset.addAndGet(4 + data.length);
+      return offset;
+    } finally {
+      _lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void prepare(String statementId, long startOffset, long endOffset) {
+    StatementMeta meta = new StatementMeta(statementId, startOffset, 
endOffset, StatementStatus.PREPARED);
+    _statementMetas.put(statementId, meta);
+    persistMetaEntry(statementId, startOffset, endOffset, 
StatementStatus.PREPARED);
+  }
+
+  @Override
+  public void commit(String statementId) {
+    StatementMeta existing = _statementMetas.get(statementId);
+    if (existing == null) {
+      LOGGER.warn("Attempted to commit unknown statement: {}", statementId);
+      return;
+    }
+    StatementMeta committed = new StatementMeta(
+        statementId, existing._startOffset, existing._endOffset, 
StatementStatus.COMMITTED);
+    _statementMetas.put(statementId, committed);
+    persistMetaEntry(statementId, existing._startOffset, existing._endOffset, 
StatementStatus.COMMITTED);
+  }
+
+  @Override
+  public void abort(String statementId) {
+    StatementMeta existing = _statementMetas.get(statementId);
+    long startOffset = existing != null ? existing._startOffset : -1;
+    long endOffset = existing != null ? existing._endOffset : -1;
+    StatementMeta aborted = new StatementMeta(statementId, startOffset, 
endOffset, StatementStatus.ABORTED);
+    _statementMetas.put(statementId, aborted);
+    persistMetaEntry(statementId, startOffset, endOffset, 
StatementStatus.ABORTED);
+  }
+
+  @Override
+  public Iterator<byte[]> read(long fromOffset) {
+    return new LogIterator(_logFile, fromOffset, _writeOffset.get());
+  }
+
+  /**
+   * Returns the current write offset (end of log).
+   */
+  public long getWriteOffset() {
+    return _writeOffset.get();
+  }
+
+  /**
+   * Returns the metadata for the given statement, or {@code null} if not 
found.
+   */
+  public StatementMeta getStatementMeta(String statementId) {
+    return _statementMetas.get(statementId);
+  }
+
+  private void persistMetaEntry(String statementId, long startOffset, long 
endOffset, StatementStatus status) {
+    try (FileOutputStream fos = new FileOutputStream(_metaFile, true);
+         FileChannel channel = fos.getChannel()) {
+      byte[] line = (statementId + "," + startOffset + "," + endOffset + "," + 
status.name() + "\n")
+          .getBytes(java.nio.charset.StandardCharsets.UTF_8);
+      channel.write(ByteBuffer.wrap(line));
+      channel.force(true);
+    } catch (IOException e) {
+      String message = "Failed to persist metadata for statement: " + 
statementId;
+      LOGGER.error(message, e);
+      throw new IllegalStateException(message, e);
+    }
+  }
+
+  private void recoverMetadata() {
+    if (!_metaFile.exists()) {
+      return;
+    }
+    try (BufferedReader reader = new BufferedReader(new 
FileReader(_metaFile))) {
+      String line;
+      while ((line = reader.readLine()) != null) {

Review Comment:
   Addressed in `e805c72443`. `recoverMetadata()` now reads `meta.log` with 
explicit UTF-8, matching the persisted metadata format. I also added a recovery 
test for the UTF-8 path.



##########
pinot-common/src/main/java/org/apache/pinot/sql/parsers/parser/SqlInsertIntoValues.java:
##########
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.parser;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+
+/**
+ * Calcite extension for creating an INSERT INTO ... VALUES sql node.
+ *
+ * <p>Syntax:
+ * {@code INSERT INTO [db_name.]table_name [(col1, col2, ...)] VALUES (val1, 
val2, ...) [, (val1, val2, ...)]}
+ *
+ * <p>This node is not thread-safe; it is created during parsing and consumed 
during compilation.
+ */
+public class SqlInsertIntoValues extends SqlCall {
+  private static final SqlSpecialOperator OPERATOR =
+      new SqlSpecialOperator("INSERT_INTO_VALUES", SqlKind.OTHER_DDL);
+
+  private final SqlIdentifier _dbName;
+  private final SqlIdentifier _tableName;
+  private final SqlNodeList _columnList;
+  private final List<SqlNodeList> _valuesList;
+
+  /**
+   * Creates an INSERT INTO ... VALUES node.
+   *
+   * @param pos parser position
+   * @param dbName optional database name (null if not specified)
+   * @param tableName target table name
+   * @param columnList optional column list (null if not specified)
+   * @param valuesList list of value rows; each row is a SqlNodeList of 
literal expressions
+   */
+  public SqlInsertIntoValues(SqlParserPos pos, @Nullable SqlIdentifier dbName, 
SqlIdentifier tableName,
+      @Nullable SqlNodeList columnList, List<SqlNodeList> valuesList) {
+    super(pos);
+    _dbName = dbName;
+    _tableName = tableName;
+    _columnList = columnList;
+    _valuesList = valuesList;
+  }
+
+  @Nullable
+  public SqlIdentifier getDbName() {
+    return _dbName;
+  }
+
+  public SqlIdentifier getTableName() {
+    return _tableName;
+  }
+
+  @Nullable
+  public SqlNodeList getColumnList() {
+    return _columnList;
+  }
+
+  public List<SqlNodeList> getValuesList() {
+    return _valuesList;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    UnparseUtils u = new UnparseUtils(writer, leftPrec, rightPrec);
+    u.keyword("INSERT", "INTO");
+    if (_dbName != null) {
+      u.node(_dbName).keyword(".");

Review Comment:
   Addressed in `e805c72443`. I switched the qualified-name unparse path to 
emit a single combined `SqlIdentifier` instead of treating `.` as a standalone 
token, which removes the `db . table` rendering issue. Added parser assertions 
for both VALUES and FROM FILE forms.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/LocalShardLog.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.spi.ingest.ShardLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * File-based append-only implementation of {@link ShardLog}.
+ *
+ * <p>Data is written as length-prefixed entries to a log file:
+ * <pre>
+ *   [4 bytes: entry length][N bytes: entry data]
+ * </pre>
+ *
+ * <p>The offset returned by {@link #append(byte[])} is the byte offset in the 
log file where the
+ * entry begins. Statement metadata (prepared/committed/aborted state and 
offset ranges) is tracked
+ * in a separate metadata file.
+ *
+ * <p>This implementation is thread-safe for concurrent appends via a 
read-write lock.
+ * Prepare/commit/abort operations for a given statement are expected to be 
serialized externally.
+ */
+public class LocalShardLog implements ShardLog {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalShardLog.class);
+  private static final String LOG_FILE_NAME = "data.log";
+  private static final String META_FILE_NAME = "meta.log";
+
+  private final File _logDir;
+  private final File _logFile;
+  private final File _metaFile;
+  private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+  private final AtomicLong _writeOffset = new AtomicLong(0);
+  private final Map<String, StatementMeta> _statementMetas = new 
ConcurrentHashMap<>();
+
+  /**
+   * Creates a new local shard log under the given directory.
+   *
+   * @param logDir the directory for this shard log's files
+   * @throws IOException if the directory cannot be created or the log file 
cannot be initialized
+   */
+  public LocalShardLog(File logDir)
+      throws IOException {
+    _logDir = logDir;
+    if (!_logDir.exists() && !_logDir.mkdirs()) {
+      throw new IOException("Failed to create log directory: " + _logDir);
+    }
+    _logFile = new File(_logDir, LOG_FILE_NAME);
+    _metaFile = new File(_logDir, META_FILE_NAME);
+
+    if (_logFile.exists()) {
+      _writeOffset.set(_logFile.length());
+    }
+
+    // Recover statement metadata from the meta file
+    recoverMetadata();
+  }
+
+  @Override
+  public long append(byte[] data) {
+    _lock.writeLock().lock();
+    try {
+      long offset = _writeOffset.get();
+      try (FileOutputStream fos = new FileOutputStream(_logFile, true);
+           FileChannel channel = fos.getChannel()) {
+        ByteBuffer header = ByteBuffer.allocate(4);
+        header.putInt(data.length);
+        header.flip();
+        channel.write(header);
+        channel.write(ByteBuffer.wrap(data));
+        channel.force(true);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to append to shard log: " + 
_logFile, e);
+      }
+      _writeOffset.addAndGet(4 + data.length);
+      return offset;
+    } finally {
+      _lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void prepare(String statementId, long startOffset, long endOffset) {
+    StatementMeta meta = new StatementMeta(statementId, startOffset, 
endOffset, StatementStatus.PREPARED);
+    _statementMetas.put(statementId, meta);
+    persistMetaEntry(statementId, startOffset, endOffset, 
StatementStatus.PREPARED);
+  }
+
+  @Override
+  public void commit(String statementId) {
+    StatementMeta existing = _statementMetas.get(statementId);
+    if (existing == null) {
+      LOGGER.warn("Attempted to commit unknown statement: {}", statementId);
+      return;
+    }
+    StatementMeta committed = new StatementMeta(
+        statementId, existing._startOffset, existing._endOffset, 
StatementStatus.COMMITTED);
+    _statementMetas.put(statementId, committed);
+    persistMetaEntry(statementId, existing._startOffset, existing._endOffset, 
StatementStatus.COMMITTED);
+  }
+
+  @Override
+  public void abort(String statementId) {
+    StatementMeta existing = _statementMetas.get(statementId);
+    long startOffset = existing != null ? existing._startOffset : -1;
+    long endOffset = existing != null ? existing._endOffset : -1;
+    StatementMeta aborted = new StatementMeta(statementId, startOffset, 
endOffset, StatementStatus.ABORTED);
+    _statementMetas.put(statementId, aborted);
+    persistMetaEntry(statementId, startOffset, endOffset, 
StatementStatus.ABORTED);
+  }
+
+  @Override
+  public Iterator<byte[]> read(long fromOffset) {
+    return new LogIterator(_logFile, fromOffset, _writeOffset.get());
+  }
+
+  /**
+   * Returns the current write offset (end of log).
+   */
+  public long getWriteOffset() {
+    return _writeOffset.get();
+  }
+
+  /**
+   * Returns the metadata for the given statement, or {@code null} if not 
found.
+   */
+  public StatementMeta getStatementMeta(String statementId) {
+    return _statementMetas.get(statementId);
+  }
+
+  private void persistMetaEntry(String statementId, long startOffset, long 
endOffset, StatementStatus status) {
+    try (FileOutputStream fos = new FileOutputStream(_metaFile, true);
+         FileChannel channel = fos.getChannel()) {
+      byte[] line = (statementId + "," + startOffset + "," + endOffset + "," + 
status.name() + "\n")
+          .getBytes(java.nio.charset.StandardCharsets.UTF_8);
+      channel.write(ByteBuffer.wrap(line));
+      channel.force(true);
+    } catch (IOException e) {
+      String message = "Failed to persist metadata for statement: " + 
statementId;
+      LOGGER.error(message, e);
+      throw new IllegalStateException(message, e);

Review Comment:
   Addressed in `e805c72443`. `persistMetaEntry()` now runs under 
`_lock.writeLock()` so concurrent `prepare/commit/abort` calls cannot 
interleave metadata appends.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/ingest/InsertRowApplier.java:
##########
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.ingest;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingest.PreparedStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Applies committed insert data to Pinot's mutable segment path.
+ *
+ * <p>On commit, reads prepared batches from the {@link PreparedStore} and 
provides them for
+ * indexing into mutable segments. Tracks applied statement IDs in a 
persistent file to enable
+ * idempotent replay after server restart.
+ *
+ * <p>The applied-statement tracking file is stored at
+ * {@code <dataDir>/insert/applied_statements.log}.
+ *
+ * <p>This class is thread-safe for concurrent apply operations on different 
statements. Concurrent
+ * apply on the same statement is not expected.
+ */
+public class InsertRowApplier {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InsertRowApplier.class);
+  private static final String APPLIED_STATEMENTS_FILE = 
"applied_statements.log";
+
+  private final PreparedStore _preparedStore;
+  private final File _appliedStatementsFile;
+  private final Set<String> _appliedStatements;
+  private final Object _fileLock = new Object();
+
+  /**
+   * Creates a new insert row applier.
+   *
+   * @param preparedStore the prepared store to read batches from
+   * @param dataDir the data directory for storing the applied-statements 
tracking file
+   */
+  public InsertRowApplier(PreparedStore preparedStore, File dataDir) {
+    _preparedStore = preparedStore;
+    File insertDir = new File(dataDir, "insert");
+    if (!insertDir.exists() && !insertDir.mkdirs()) {
+      throw new RuntimeException("Failed to create insert directory: " + 
insertDir);
+    }
+    _appliedStatementsFile = new File(insertDir, APPLIED_STATEMENTS_FILE);
+    _appliedStatements = loadAppliedStatements();
+  }
+
+  /**
+   * Applies the committed data for the given statement and partition. Returns 
the deserialized
+   * rows that should be indexed into the mutable segment.
+   *
+   * <p>This method is idempotent: if the statement has already been applied 
(tracked in the
+   * applied-statements file), it returns {@code null} to indicate no action 
is needed.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   * @return the list of rows to index, or {@code null} if already applied
+   * @throws IOException if reading or deserializing the prepared data fails
+   */
+  public List<GenericRow> apply(String statementId, int partitionId, long 
sequenceNo)
+      throws IOException {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+
+    if (_appliedStatements.contains(applyKey)) {
+      LOGGER.info("Statement {} partition {} seq {} already applied, 
skipping", statementId, partitionId, sequenceNo);
+      return null;
+    }
+
+    byte[] data = _preparedStore.load(statementId, partitionId, sequenceNo);
+    if (data == null) {
+      LOGGER.warn("No prepared data found for statement {} partition {} seq 
{}", statementId, partitionId, sequenceNo);
+      return null;
+    }
+
+    List<GenericRow> rows = GenericRowSerializer.deserializeRows(data);
+    LOGGER.info("Deserialized {} rows for statement {} partition {} seq {}", 
rows.size(), statementId, partitionId,
+        sequenceNo);
+
+    // Note: do NOT mark as applied here. The caller must invoke 
confirmApplied() after
+    // successfully indexing the rows. Marking before indexing would cause 
data loss if
+    // indexing fails — the batch would be permanently skipped on subsequent 
recovery.
+
+    return rows;
+  }
+
+  /**
+   * Confirms that a batch has been successfully indexed and should not be 
replayed again.
+   * Must be called by the caller after the rows returned by {@link #apply} 
have been
+   * successfully indexed into the mutable segment.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   */
+  public void confirmApplied(String statementId, int partitionId, long 
sequenceNo) {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+    markApplied(applyKey);
+    LOGGER.info("Confirmed apply for statement {} partition {} seq {}", 
statementId, partitionId, sequenceNo);
+  }
+
+  /**
+   * Returns whether a given statement/partition/sequence has already been 
applied.
+   *
+   * @param statementId the statement identifier
+   * @param partitionId the partition identifier
+   * @param sequenceNo the sequence number
+   * @return {@code true} if already applied
+   */
+  public boolean isApplied(String statementId, int partitionId, long 
sequenceNo) {
+    String applyKey = statementId + ":" + partitionId + ":" + sequenceNo;
+    return _appliedStatements.contains(applyKey);
+  }
+
+  private void markApplied(String applyKey) {
+    synchronized (_fileLock) {
+      _appliedStatements.add(applyKey);
+      try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(_appliedStatementsFile, true))) {
+        writer.write(applyKey);
+        writer.newLine();
+        writer.flush();
+      } catch (IOException e) {
+        LOGGER.error("Failed to record applied statement: {}", applyKey, e);
+      }

Review Comment:
   Addressed in `e805c72443`. All applied-statements persistence paths now use 
explicit UTF-8, including append, rewrite, and replay.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/ingest/FileInsertArtifactPublisher.java:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.ingest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingest.ArtifactPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manages staging and publishing of segment artifacts produced by file-based 
INSERT INTO statements.
+ *
+ * <p>Artifacts are staged in a temporary namespace within the deep store 
(under a
+ * {@code _insert_staging/<statementId>/} prefix). On commit, they are moved 
to their final
+ * location. On abort, staged artifacts are cleaned up.
+ *
+ * <p>This publisher delegates to the configured {@link PinotFS} 
implementation for the deep store
+ * scheme.
+ *
+ * <p>This class is thread-safe for concurrent operations across different 
statements.
+ */
+public class FileInsertArtifactPublisher implements ArtifactPublisher {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileInsertArtifactPublisher.class);
+
+  static final String STAGING_DIR_PREFIX = "_insert_staging";
+  static final String DEEP_STORE_URI_KEY = "controller.data.dir";
+
+  private URI _deepStoreBaseUri;
+
+  /**
+   * Tracks which statements have staged artifacts so cleanup can enumerate 
them.
+   */
+  private final ConcurrentHashMap<String, Set<URI>> _stagedArtifacts = new 
ConcurrentHashMap<>();
+
+  @Override
+  public void init(PinotConfiguration config) {
+    String deepStoreUri = config.getProperty(DEEP_STORE_URI_KEY);
+    if (deepStoreUri != null) {
+      _deepStoreBaseUri = URI.create(deepStoreUri);
+    }
+  }
+
+  /**
+   * Sets the deep store base URI directly. Useful for testing.
+   */
+  public void setDeepStoreBaseUri(URI deepStoreBaseUri) {
+    _deepStoreBaseUri = deepStoreBaseUri;
+  }
+
+  @Override
+  public URI stageArtifact(String statementId, String artifactName, 
InputStream data) {
+    URI stagingDir = getStagingDir(statementId);
+    URI artifactUri = URI.create(stagingDir.toString() + "/" + artifactName);
+
+    try {
+      PinotFS pinotFS = PinotFSFactory.create(stagingDir.getScheme());

Review Comment:
   Addressed in `e805c72443`. The staged artifact path now derives from the 
normalized deep-store base URI, so the same scheme-less local-path fix applies 
at this call site as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to