This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dd6520c7267 Validate ingestFromURI filesystem sources (#18660)
dd6520c7267 is described below
commit dd6520c726761a6988107b96bfa41daf6617e522
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jun 3 18:03:01 2026 -0700
Validate ingestFromURI filesystem sources (#18660)
---
.../apache/pinot/controller/ControllerConf.java | 7 +
.../resources/PinotIngestionRestletResource.java | 3 +-
.../pinot/controller/util/FileIngestionHelper.java | 45 ++++++-
...PinotIngestionRestletResourceStatelessTest.java | 5 +-
.../controller/util/FileIngestionHelperTest.java | 148 +++++++++++++++++++++
.../pinot/spi/filesystem/PinotFSFactory.java | 11 ++
.../pinot/spi/filesystem/PinotFSFactoryTest.java | 3 +
7 files changed, 216 insertions(+), 6 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index c4d9b131d19..9020f0be211 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -354,6 +354,8 @@ public class ControllerConf extends PinotConfiguration {
public static final String TABLE_MIN_REPLICAS = "table.minReplicas";
public static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
public static final String JERSEY_ADMIN_IS_PRIMARY =
"jersey.admin.isprimary";
+ public static final String INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM =
+ "controller.ingestFromURI.allowLocalFileSystem";
public static final String ACCESS_CONTROL_FACTORY_CLASS =
"controller.admin.access.control.factory.class";
public static final String ACCESS_CONTROL_USERNAME =
"access.control.init.username";
public static final String ACCESS_CONTROL_PASSWORD =
"access.control.init.password";
@@ -402,6 +404,7 @@ public class ControllerConf extends PinotConfiguration {
public static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
public static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
public static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
+ public static final boolean DEFAULT_INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM
= false;
public static final String DEFAULT_ACCESS_CONTROL_FACTORY_CLASS =
"org.apache.pinot.controller.api.access.AllowAllAccessFactory";
public static final String DEFAULT_ACCESS_CONTROL_USERNAME = "admin";
@@ -1193,6 +1196,10 @@ public class ControllerConf extends PinotConfiguration {
return getProperty(JERSEY_ADMIN_API_PORT,
String.valueOf(DEFAULT_JERSEY_ADMIN_PORT));
}
+ public boolean isIngestFromUriLocalFileSystemAllowed() {
+ return getProperty(INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM,
DEFAULT_INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM);
+ }
+
public void setInitAccessControlUsername(String username) {
setProperty(ACCESS_CONTROL_USERNAME, username);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
index 5b8aae573ef..4bcce019729 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotIngestionRestletResource.java
@@ -234,7 +234,8 @@ public class PinotIngestionRestletResource {
FileIngestionHelper fileIngestionHelper =
new FileIngestionHelper(tableConfig, schema, batchConfigMap,
getControllerUri(),
- new File(_controllerConf.getLocalTempDir(), INGESTION_DIR),
authProvider);
+ new File(_controllerConf.getLocalTempDir(), INGESTION_DIR),
authProvider,
+ _controllerConf.isIngestFromUriLocalFileSystemAllowed());
return fileIngestionHelper.buildSegmentAndPush(payload);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 5d0f9195bd2..c9ad75c511b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
@@ -73,15 +74,17 @@ public class FileIngestionHelper {
private final URI _controllerUri;
private final File _ingestionDir;
private final AuthProvider _authProvider;
+ private final boolean _allowLocalFileSystemInUri;
public FileIngestionHelper(TableConfig tableConfig, Schema schema,
Map<String, String> batchConfigMap,
- URI controllerUri, File ingestionDir, AuthProvider authProvider) {
+ URI controllerUri, File ingestionDir, AuthProvider authProvider, boolean
allowLocalFileSystemInUri) {
_tableConfig = tableConfig;
_schema = schema;
_batchConfigMap = batchConfigMap;
_controllerUri = controllerUri;
_ingestionDir = ingestionDir;
_authProvider = authProvider;
+ _allowLocalFileSystemInUri = allowLocalFileSystemInUri;
}
/**
@@ -112,7 +115,7 @@ public class FileIngestionHelper {
File inputFile = new File(inputDir, String.format(
"%s.%s", DATA_FILE_PREFIX,
_batchConfigMap.get(BatchConfigProperties.INPUT_FORMAT).toLowerCase()));
if (payload._payloadType == PayloadType.URI) {
- copyURIToLocal(_batchConfigMap, payload._uri, inputFile);
+ copyURIToLocal(_batchConfigMap, payload._uri, inputFile,
_allowLocalFileSystemInUri);
LOGGER.info("Copied from URI: {} to local file: {}", payload._uri,
inputFile.getAbsolutePath());
} else {
copyMultipartToLocal(payload._multiPart, inputFile);
@@ -176,14 +179,48 @@ public class FileIngestionHelper {
*/
public static void copyURIToLocal(Map<String, String> batchConfigMap, URI
sourceFileURI, File destFile)
throws Exception {
+ copyURIToLocal(batchConfigMap, sourceFileURI, destFile, true);
+ }
+
+ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI
sourceFileURI, File destFile,
+ boolean allowLocalFileSystem)
+ throws Exception {
String sourceFileURIScheme = sourceFileURI.getScheme();
+ Preconditions.checkArgument(allowLocalFileSystem ||
StringUtils.isNotBlank(sourceFileURIScheme),
+ "Source URI must include a scheme: %s", sourceFileURI);
+ Preconditions.checkArgument(allowLocalFileSystem
+ ||
!PinotFSFactory.LOCAL_PINOT_FS_SCHEME.equalsIgnoreCase(sourceFileURIScheme),
+ "Local filesystem URIs are not allowed for /ingestFromURI: %s",
sourceFileURI);
if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
- PinotFSFactory.register(sourceFileURIScheme,
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
- IngestionConfigUtils.getInputFsProps(batchConfigMap));
+ String inputFsClassName =
batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS);
+ Preconditions.checkArgument(StringUtils.isNotBlank(inputFsClassName),
+ "Must provide %s for unsupported scheme: %s",
BatchConfigProperties.INPUT_FS_CLASS, sourceFileURIScheme);
+ Preconditions.checkArgument(allowLocalFileSystem ||
!isLocalPinotFSClassName(inputFsClassName),
+ "Local filesystem implementation is not allowed for /ingestFromURI:
%s", inputFsClassName);
+ try {
+ PinotFSFactory.register(sourceFileURIScheme, inputFsClassName,
+ IngestionConfigUtils.getInputFsProps(batchConfigMap));
+ } catch (RuntimeException e) {
+ throw new IllegalArgumentException(
+ String.format("Invalid %s for /ingestFromURI: %s",
BatchConfigProperties.INPUT_FS_CLASS,
+ inputFsClassName), e);
+ }
}
+ Preconditions.checkArgument(allowLocalFileSystem
+ || !PinotFSFactory.isSchemeRegisteredWith(sourceFileURIScheme,
LocalPinotFS.class),
+ "Local filesystem implementation is not allowed for /ingestFromURI:
%s", sourceFileURIScheme);
PinotFSFactory.create(sourceFileURIScheme).copyToLocalFile(sourceFileURI,
destFile);
}
+ private static boolean isLocalPinotFSClassName(String className) {
+ String rawClassName = className;
+ int pluginSeparatorIndex = className.indexOf(':');
+ if (pluginSeparatorIndex >= 0) {
+ rawClassName = className.substring(pluginSeparatorIndex + 1);
+ }
+ return
LocalPinotFS.class.getName().equals(PluginManager.loadClassWithBackwardCompatibleCheck(rawClassName));
+ }
+
/**
* Copy the file from the uploaded multipart to a local file
*/
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
index b5a17ad85c7..e41afd2eb05 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotIngestionRestletResourceStatelessTest.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -56,7 +57,9 @@ public class PinotIngestionRestletResourceStatelessTest
extends ControllerTest {
public void setUp()
throws Exception {
startZk();
- startController();
+ Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
+
controllerConfig.put(ControllerConf.INGEST_FROM_URI_ALLOW_LOCAL_FILE_SYSTEM,
true);
+ startController(controllerConfig);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/util/FileIngestionHelperTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/FileIngestionHelperTest.java
new file mode 100644
index 00000000000..dab8d6eef0d
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/FileIngestionHelperTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.LocalPinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+@Test(groups = "stateless")
+public class FileIngestionHelperTest {
+ private static final File DEST_FILE = new File("unused");
+
+ @Test
+ public void testCopiesLocalFileUriWhenLocalFileSystemIsEnabled()
+ throws Exception {
+ Path tempDir = Files.createTempDirectory("pinot-ingest-uri-test");
+ try {
+ Path inputFile = tempDir.resolve("input.csv");
+ Path destFile = tempDir.resolve("dest.csv");
+ Files.writeString(inputFile, "col\nvalue\n", StandardCharsets.UTF_8);
+
+ FileIngestionHelper.copyURIToLocal(new HashMap<>(), inputFile.toUri(),
destFile.toFile(), true);
+
+ assertEquals(Files.readString(destFile, StandardCharsets.UTF_8),
"col\nvalue\n");
+ } finally {
+ FileUtils.deleteQuietly(tempDir.toFile());
+ }
+ }
+
+ @Test
+ public void testRejectsLocalFileUriWhenLocalFileSystemIsDisabled() {
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(),
URI.create("file:///tmp/input.csv"), DEST_FILE,
+ false));
+
+ assertTrue(exception.getMessage().contains("Local filesystem URIs are not
allowed"));
+ }
+
+ @Test
+ public void testRejectsSchemeLessUriWhenLocalFileSystemIsDisabled() {
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(),
URI.create("/tmp/input.csv"), DEST_FILE, false));
+
+ assertTrue(exception.getMessage().contains("Source URI must include a
scheme"));
+ }
+
+ @Test
+ public void testPreservesSchemeLessUriHandlingWhenLocalFileSystemIsEnabled()
{
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(),
URI.create("/tmp/input.csv"), DEST_FILE, true));
+
+ assertTrue(exception.getMessage().contains("Must provide
input.fs.className"));
+ }
+
+ @Test
+ public void
testRejectsLocalFileSystemClassForCustomSchemeWhenLocalFileSystemIsDisabled() {
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS,
LocalPinotFS.class.getName());
+
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(batchConfigMap,
URI.create("customlocalfstest:///tmp/input.csv"),
+ DEST_FILE, false));
+
+ assertTrue(exception.getMessage().contains("Local filesystem
implementation is not allowed"));
+ }
+
+ @Test
+ public void testRejectsInvalidFileSystemClassForCustomScheme() {
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS,
+ "org.apache.pinot.spi.filesystem.DoesNotExistPinotFS");
+
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(batchConfigMap,
URI.create("invalidlocalfstest:///tmp/input.csv"),
+ DEST_FILE, false));
+
+ assertTrue(exception.getMessage().contains("Invalid input.fs.className for
/ingestFromURI"));
+ }
+
+ @Test
+ public void
testRejectsLegacyLocalFileSystemClassForCustomSchemeWhenLocalFileSystemIsDisabled()
{
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS,
"org.apache.pinot.filesystem.LocalPinotFS");
+
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(batchConfigMap,
URI.create("legacylocalfstest:///tmp/input.csv"),
+ DEST_FILE, false));
+
+ assertTrue(exception.getMessage().contains("Local filesystem
implementation is not allowed"));
+ }
+
+ @Test
+ public void
testRejectsRegisteredLocalFileSystemSchemeWhenLocalFileSystemIsDisabled() {
+ PinotFSFactory.register("registeredlocalfstest",
LocalPinotFS.class.getName(), new PinotConfiguration());
+
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(new HashMap<>(),
URI.create("registeredlocalfstest:///tmp/input.csv"),
+ DEST_FILE, false));
+
+ assertTrue(exception.getMessage().contains("Local filesystem
implementation is not allowed"));
+ }
+
+ @Test
+ public void
testRejectsLocalFileSystemSubclassForCustomSchemeWhenLocalFileSystemIsDisabled()
{
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.INPUT_FS_CLASS,
TestLocalPinotFS.class.getName());
+
+ IllegalArgumentException exception =
expectThrows(IllegalArgumentException.class,
+ () -> FileIngestionHelper.copyURIToLocal(batchConfigMap,
+ URI.create("customlocalfssubclasstest:///tmp/input.csv"),
DEST_FILE, false));
+
+ assertTrue(exception.getMessage().contains("Local filesystem
implementation is not allowed"));
+ }
+
+ public static class TestLocalPinotFS extends LocalPinotFS {
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
index eafba8a2561..cb78c16342f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFSFactory.java
@@ -83,6 +83,17 @@ public class PinotFSFactory {
return PINOT_FS_MAP.containsKey(scheme);
}
+ public static boolean isSchemeRegisteredWith(String scheme, Class<? extends
PinotFS> pinotFSClass) {
+ PinotFS pinotFS = PINOT_FS_MAP.get(scheme);
+ if (pinotFS == null) {
+ return false;
+ }
+ if (pinotFS instanceof NoClosePinotFS) {
+ pinotFS = ((NoClosePinotFS) pinotFS)._delegate;
+ }
+ return pinotFSClass.isInstance(pinotFS);
+ }
+
public static void shutdown()
throws IOException {
for (PinotFS pinotFS : PINOT_FS_MAP.values()) {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
index 248d92469a2..77172f90ca0 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSFactoryTest.java
@@ -36,6 +36,9 @@ public class PinotFSFactoryTest {
PinotFSFactory.init(new PinotConfiguration());
NoClosePinotFS pinotFS = (NoClosePinotFS) PinotFSFactory.create("file");
Assert.assertTrue(pinotFS._delegate instanceof LocalPinotFS);
+ Assert.assertTrue(PinotFSFactory.isSchemeRegisteredWith("file",
LocalPinotFS.class));
+ Assert.assertFalse(PinotFSFactory.isSchemeRegisteredWith("file",
TestPinotFS.class));
+ Assert.assertFalse(PinotFSFactory.isSchemeRegisteredWith("missing",
LocalPinotFS.class));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]