This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dd6520c7267 Validate ingestFromURI filesystem sources (#18660)
dd6520c7267 is described below

commit dd6520c726761a6988107b96bfa41daf6617e522
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jun 3 18:03:01 2026 -0700

    Validate ingestFromURI filesystem sources (#18660)
---
 .../apache/pinot/controller/ControllerConf.java    |   7 +
 .../resources/PinotIngestionRestletResource.java   |   3 +-
 .../pinot/controller/util/FileIngestionHelper.java |  45 ++++++-
 ...PinotIngestionRestletResourceStatelessTest.java |   5 +-
 .../controller/util/FileIngestionHelperTest.java   | 148 +++++++++++++++++++++
 .../pinot/spi/filesystem/PinotFSFactory.java       |  11 ++
 .../pinot/spi/filesystem/PinotFSFactoryTest.java   |   3 +
 7 files changed, 216 insertions(+), 6 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index c4d9b131d19..9020f0be211 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -354,6 +354,8 @@ public class ControllerConf extends PinotConfiguration {
   public static final String TABLE_MIN_REPLICAS = "table.minReplicas";
   public static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
   public static final String JERSEY_ADMIN_IS_PRIMARY = 
"jersey.admin.isprimary";
+  public static final String INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM =
+      "controller.ingestFromURI.allowLocalFileSystem";
   public static final String ACCESS_CONTROL_FACTORY_CLASS = 
"controller.admin.access.control.factory.class";
   public static final String ACCESS_CONTROL_USERNAME = 
"access.control.init.username";
   public static final String ACCESS_CONTROL_PASSWORD = 
"access.control.init.password";
@@ -402,6 +404,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
   public static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
   public static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
+  public static final boolean DEFAULT_INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM 
= false;
   public static final String DEFAULT_ACCESS_CONTROL_FACTORY_CLASS =
       "org.apache.pinot.controller.api.access.AllowAllAccessFactory";
   public static final String DEFAULT_ACCESS_CONTROL_USERNAME = "admin";
@@ -1193,6 +1196,10 @@ public class ControllerConf extends PinotConfiguration {
     return getProperty(JERSEY_ADMIN_API_PORT, 
String.valueOf(DEFAULT_JERSEY_ADMIN_PORT));
   }
 
+  public boolean isIngestFromUriLocalFileSystemAllowed() {
+    return getProperty(INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM, 
DEFAULT_INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM);
+  }
+
   public void setInitAccessControlUsername(String username) {
     setProperty(ACCESS_CONTROL_USERNAME, username);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index 5b8aae573ef..4bcce019729 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -234,7 +234,8 @@ public class PinotIngestionRestletResource {
 
     FileIngestionHelper fileIngestionHelper =
         new FileIngestionHelper(tableConfig, schema, batchConfigMap, 
getControllerUri(),
-            new File(_controllerConf.getLocalTempDir(), INGESTION_DIR), 
authProvider);
+            new File(_controllerConf.getLocalTempDir(), INGESTION_DIR), 
authProvider,
+            _controllerConf.isIngestFromUriLocalFileSystemAllowed());
     return fileIngestionHelper.buildSegmentAndPush(payload);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 5d0f9195bd2..c9ad75c511b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
@@ -73,15 +74,17 @@ public class FileIngestionHelper {
   private final URI _controllerUri;
   private final File _ingestionDir;
   private final AuthProvider _authProvider;
+  private final boolean _allowLocalFileSystemInUri;
 
   public FileIngestionHelper(TableConfig tableConfig, Schema schema, 
Map<String, String> batchConfigMap,
-      URI controllerUri, File ingestionDir, AuthProvider authProvider) {
+      URI controllerUri, File ingestionDir, AuthProvider authProvider, boolean 
allowLocalFileSystemInUri) {
     _tableConfig = tableConfig;
     _schema = schema;
     _batchConfigMap = batchConfigMap;
     _controllerUri = controllerUri;
     _ingestionDir = ingestionDir;
     _authProvider = authProvider;
+    _allowLocalFileSystemInUri = allowLocalFileSystemInUri;
   }
 
   /**
@@ -112,7 +115,7 @@ public class FileIngestionHelper {
       File inputFile = new File(inputDir, String.format(
           "%s.%s", DATA_FILE_PREFIX, 
_batchConfigMap.get(BatchConfigProperties.INPUT_FORMAT).toLowerCase()));
       if (payload._payloadType == PayloadType.URI) {
-        copyURIToLocal(_batchConfigMap, payload._uri, inputFile);
+        copyURIToLocal(_batchConfigMap, payload._uri, inputFile, 
_allowLocalFileSystemInUri);
         LOGGER.info("Copied from URI: {} to local file: {}", payload._uri, 
inputFile.getAbsolutePath());
       } else {
         copyMultipartToLocal(payload._multiPart, inputFile);
@@ -176,14 +179,48 @@ public class FileIngestionHelper {
    */
   public static void copyURIToLocal(Map<String, String> batchConfigMap, URI 
sourceFileURI, File destFile)
       throws Exception {
+    copyURIToLocal(batchConfigMap, sourceFileURI, destFile, true);
+  }
+
+  public static void copyURIToLocal(Map<String, String> batchConfigMap, URI 
sourceFileURI, File destFile,
+      boolean allowLocalFileSystem)
+      throws Exception {
     String sourceFileURIScheme = sourceFileURI.getScheme();
+    Preconditions.checkArgument(allowLocalFileSystem || 
StringUtils.isNotBlank(sourceFileURIScheme),
+        "Source URI must include a scheme: %s", sourceFileURI);
+    Preconditions.checkArgument(allowLocalFileSystem
+            || 
!PinotFSFactory.LOCAL_PINOT_FS_SCHEME.equalsIgnoreCase(sourceFileURIScheme),
+        "Local filesystem URIs are not allowed for /ingestFromURI: %s", 
sourceFileURI);
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
-      PinotFSFactory.register(sourceFileURIScheme, 
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
-          IngestionConfigUtils.getInputFsProps(batchConfigMap));
+      String inputFsClassName = 
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+      Preconditions.checkArgument(StringUtils.isNotBlank(inputFsClassName),
+          "Must provide %s for unsupported scheme: %s", 
BatchConfigProperties.INPUT_FS_CLASS, sourceFileURIScheme);
+      Preconditions.checkArgument(allowLocalFileSystem || 
!isLocalPinotFSClassName(inputFsClassName),
+          "Local filesystem implementation is not allowed for /ingestFromURI: 
%s", inputFsClassName);
+      try {
+        PinotFSFactory.register(sourceFileURIScheme, inputFsClassName,
+            IngestionConfigUtils.getInputFsProps(batchConfigMap));
+      } catch (RuntimeException e) {
+        throw new IllegalArgumentException(
+            String.format("Invalid %s for /ingestFromURI: %s", 
BatchConfigProperties.INPUT_FS_CLASS,
+                inputFsClassName), e);
+      }
     }
+    Preconditions.checkArgument(allowLocalFileSystem
+            || !PinotFSFactory.isSchemeRegisteredWith(sourceFileURIScheme, 
LocalPinotFS.class),
+        "Local filesystem implementation is not allowed for /ingestFromURI: 
%s", sourceFileURIScheme);
     PinotFSFactory.create(sourceFileURIScheme).copyToLocalFile(sourceFileURI, 
destFile);
   }
 
+  private static boolean isLocalPinotFSClassName(String className) {
+    String rawClassName = className;
+    int pluginSeparatorIndex = className.indexOf(':');
+    if (pluginSeparatorIndex >= 0) {
+      rawClassName = className.substring(pluginSeparatorIndex + 1);
+    }
+    return 
LocalPinotFS.class.getName().equals(PluginManager.loadClassWithBackwardCompatibleCheck(rawClassName));
+  }
+
   /**
    * Copy the file from the uploaded multipart to a local file
    */
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
index b5a17ad85c7..e41afd2eb05 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -56,7 +57,9 @@ public class PinotIngestionRestletResourceStatelessTest 
extends ControllerTest {
   public void setUp()
       throws Exception {
     startZk();
-    startController();
+    Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
+    
controllerConfig.put(ControllerConf.INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM, 
true);
+    startController(controllerConfig);
     addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
     addFakeServerInstancesToAutoJoinHelixCluster(1, true);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/util/FileIngestionHelperTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/FileIngestionHelperTest.java
new file mode 100644
index 00000000000..dab8d6eef0d
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/FileIngestionHelperTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+@Test(groups = "stateless")
+public class FileIngestionHelperTest {
+  private static final File DEST_FILE = new File("unused");
+
+  @Test
+  public void testCopiesLocalFileUriWhenLocalFileSystemIsEnabled()
+      throws Exception {
+    Path tempDir = Files.createTempDirectory("pinot-ingest-uri-test");
+    try {
+      Path inputFile = tempDir.resolve("input.csv");
+      Path destFile = tempDir.resolve("dest.csv");
+      Files.writeString(inputFile, "col\nvalue\n", StandardCharsets.UTF_8);
+
+      FileIngestionHelper.copyURIToLocal(new HashMap<>(), inputFile.toUri(), 
destFile.toFile(), true);
+
+      assertEquals(Files.readString(destFile, StandardCharsets.UTF_8), 
"col\nvalue\n");
+    } finally {
+      FileUtils.deleteQuietly(tempDir.toFile());
+    }
+  }
+
+  @Test
+  public void testRejectsLocalFileUriWhenLocalFileSystemIsDisabled() {
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(), 
URI.create("file:///tmp/input.csv"), DEST_FILE,
+            false));
+
+    assertTrue(exception.getMessage().contains("Local filesystem URIs are not 
allowed"));
+  }
+
+  @Test
+  public void testRejectsSchemeLessUriWhenLocalFileSystemIsDisabled() {
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(), 
URI.create("/tmp/input.csv"), DEST_FILE, false));
+
+    assertTrue(exception.getMessage().contains("Source URI must include a 
scheme"));
+  }
+
+  @Test
+  public void testPreservesSchemeLessUriHandlingWhenLocalFileSystemIsEnabled() 
{
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(), 
URI.create("/tmp/input.csv"), DEST_FILE, true));
+
+    assertTrue(exception.getMessage().contains("Must provide 
input.fs.className"));
+  }
+
+  @Test
+  public void 
testRejectsLocalFileSystemClassForCustomSchemeWhenLocalFileSystemIsDisabled() {
+    Map<String, String> batchConfigMap = new HashMap<>();
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, 
LocalPinotFS.class.getName());
+
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(batchConfigMap, 
URI.create("customlocalfstest:///tmp/input.csv"),
+            DEST_FILE, false));
+
+    assertTrue(exception.getMessage().contains("Local filesystem 
implementation is not allowed"));
+  }
+
+  @Test
+  public void testRejectsInvalidFileSystemClassForCustomScheme() {
+    Map<String, String> batchConfigMap = new HashMap<>();
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS,
+        "org.apache.pinot.spi.filesystem.DoesNotExistPinotFS");
+
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(batchConfigMap, 
URI.create("invalidlocalfstest:///tmp/input.csv"),
+            DEST_FILE, false));
+
+    assertTrue(exception.getMessage().contains("Invalid input.fs.className for 
/ingestFromURI"));
+  }
+
+  @Test
+  public void 
testRejectsLegacyLocalFileSystemClassForCustomSchemeWhenLocalFileSystemIsDisabled()
 {
+    Map<String, String> batchConfigMap = new HashMap<>();
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, 
"org.apache.pinot.filesystem.LocalPinotFS");
+
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(batchConfigMap, 
URI.create("legacylocalfstest:///tmp/input.csv"),
+            DEST_FILE, false));
+
+    assertTrue(exception.getMessage().contains("Local filesystem 
implementation is not allowed"));
+  }
+
+  @Test
+  public void 
testRejectsRegisteredLocalFileSystemSchemeWhenLocalFileSystemIsDisabled() {
+    PinotFSFactory.register("registeredlocalfstest", 
LocalPinotFS.class.getName(), new PinotConfiguration());
+
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(), 
URI.create("registeredlocalfstest:///tmp/input.csv"),
+            DEST_FILE, false));
+
+    assertTrue(exception.getMessage().contains("Local filesystem 
implementation is not allowed"));
+  }
+
+  @Test
+  public void 
testRejectsLocalFileSystemSubclassForCustomSchemeWhenLocalFileSystemIsDisabled()
 {
+    Map<String, String> batchConfigMap = new HashMap<>();
+    batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS, 
TestLocalPinotFS.class.getName());
+
+    IllegalArgumentException exception = 
expectThrows(IllegalArgumentException.class,
+        () -> FileIngestionHelper.copyURIToLocal(batchConfigMap,
+            URI.create("customlocalfssubclasstest:///tmp/input.csv"), 
DEST_FILE, false));
+
+    assertTrue(exception.getMessage().contains("Local filesystem 
implementation is not allowed"));
+  }
+
+  public static class TestLocalPinotFS extends LocalPinotFS {
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index eafba8a2561..cb78c16342f 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -83,6 +83,17 @@ public class PinotFSFactory {
     return PINOT_FS_MAP.containsKey(scheme);
   }
 
+  public static boolean isSchemeRegisteredWith(String scheme, Class<? extends 
PinotFS> pinotFSClass) {
+    PinotFS pinotFS = PINOT_FS_MAP.get(scheme);
+    if (pinotFS == null) {
+      return false;
+    }
+    if (pinotFS instanceof NoClosePinotFS) {
+      pinotFS = ((NoClosePinotFS) pinotFS)._delegate;
+    }
+    return pinotFSClass.isInstance(pinotFS);
+  }
+
   public static void shutdown()
       throws IOException {
     for (PinotFS pinotFS : PINOT_FS_MAP.values()) {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
index 248d92469a2..77172f90ca0 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
@@ -36,6 +36,9 @@ public class PinotFSFactoryTest {
     PinotFSFactory.init(new PinotConfiguration());
     NoClosePinotFS pinotFS = (NoClosePinotFS) PinotFSFactory.create("file");
     Assert.assertTrue(pinotFS._delegate instanceof LocalPinotFS);
+    Assert.assertTrue(PinotFSFactory.isSchemeRegisteredWith("file", 
LocalPinotFS.class));
+    Assert.assertFalse(PinotFSFactory.isSchemeRegisteredWith("file", 
TestPinotFS.class));
+    Assert.assertFalse(PinotFSFactory.isSchemeRegisteredWith("missing", 
LocalPinotFS.class));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to