This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 276633679fa Decouple Flink connector from pinot-controller (#18054)
276633679fa is described below
commit 276633679fac8829d114a36bc7913c7cc0ed2e9e
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Apr 9 17:52:33 2026 -0700
Decouple Flink connector from pinot-controller (#18054)
- Replace ControllerRequestClient with PinotAdminClient in Flink connector
- Add typed TableConfig/Schema fetch APIs to pinot-java-client
- Merge batch config overrides instead of appending (SegmentUploaderDefault
requires exactly 1)
- Pass tableType query param in getTableConfig for efficient filtering
- Preserve controller path prefix in FlinkQuickStart for proxied deployments
- Use SLF4J parameterized logging in PinotAdminClientExample
- Update README with new PinotAdminClient API usage
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../client/admin/PinotAdminClientExample.java | 79 ++++++-----
.../java/org/apache/pinot/client/admin/README.md | 8 +-
.../pinot/client/admin/TableAdminClient.java | 8 ++
.../pinot/client/admin/PinotAdminClientTest.java | 17 +++
pinot-connectors/pinot-flink-connector/README.md | 66 ++++++---
pinot-connectors/pinot-flink-connector/pom.xml | 11 +-
.../pinot/connector/flink/FlinkQuickStart.java | 27 +++-
.../connector/flink/sink/PinotSinkFunction.java | 57 ++++++++
.../flink/sink/PinotSinkFunctionTest.java | 84 +++++++++++
.../api/PinotTableRestletResourceTest.java | 153 +++++++++++----------
10 files changed, 377 insertions(+), 133 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
index 3f567a0a5e2..ec376b3cf77 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
@@ -18,14 +18,22 @@
*/
package org.apache.pinot.client.admin;
+import java.io.IOException;
import java.util.Map;
import java.util.Properties;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Example demonstrating how to use PinotAdminClient.
*/
public class PinotAdminClientExample {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotAdminClientExample.class);
+
private PinotAdminClientExample() {
}
@@ -34,7 +42,7 @@ public class PinotAdminClientExample {
try (PinotAdminClient adminClient = new
PinotAdminClient("localhost:9000")) {
exampleBasicUsage(adminClient);
} catch (Exception e) {
- System.err.println("Error in basic usage example: " + e.getMessage());
+ LOGGER.error("Error in basic usage example", e);
}
// Example 2: Usage with basic authentication
@@ -42,102 +50,107 @@ public class PinotAdminClientExample {
Properties properties = new Properties();
properties.setProperty("pinot.admin.request.timeout.ms", "30000");
- PinotAdminClient adminClient = new PinotAdminClient("localhost:9000",
properties,
+ try (PinotAdminClient adminClient = new
PinotAdminClient("localhost:9000", properties,
PinotAdminAuthentication.AuthType.BASIC,
- Map.of("username", "admin", "password", "password"));
-
- exampleWithAuthentication(adminClient);
- adminClient.close();
+ Map.of("username", "admin", "password", "password"))) {
+ exampleWithAuthentication(adminClient);
+ }
} catch (Exception e) {
- System.err.println("Error in authentication example: " + e.getMessage());
+ LOGGER.error("Error in authentication example", e);
}
// Example 3: Usage with bearer token authentication
try {
Properties properties = new Properties();
- PinotAdminClient adminClient = new PinotAdminClient("localhost:9000",
properties,
+ try (PinotAdminClient adminClient = new
PinotAdminClient("localhost:9000", properties,
PinotAdminAuthentication.AuthType.BEARER,
- Map.of("token", "your-bearer-token"));
-
- exampleWithBearerAuth(adminClient);
- adminClient.close();
+ Map.of("token", "your-bearer-token"))) {
+ exampleWithBearerAuth(adminClient);
+ }
} catch (Exception e) {
- System.err.println("Error in bearer auth example: " + e.getMessage());
+ LOGGER.error("Error in bearer auth example", e);
}
}
private static void exampleBasicUsage(PinotAdminClient adminClient)
throws PinotAdminException {
- System.out.println("=== Basic Usage Example ===");
+ LOGGER.info("=== Basic Usage Example ===");
try {
// List tables
var tables = adminClient.getTableClient().listTables(null, null, null);
- System.out.println("Tables: " + tables);
+ LOGGER.info("Tables: {}", tables);
// List schemas
var schemas = adminClient.getSchemaClient().listSchemaNames();
- System.out.println("Schemas: " + schemas);
+ LOGGER.info("Schemas: {}", schemas);
// List instances
var instances = adminClient.getInstanceClient().listInstances();
- System.out.println("Instances: " + instances);
+ LOGGER.info("Instances: {}", instances);
// List tenants
var tenants = adminClient.getTenantClient().listTenants();
- System.out.println("Tenants: " + tenants);
+ LOGGER.info("Tenants: {}", tenants);
// List task types
var taskTypes = adminClient.getTaskClient().listTaskTypes();
- System.out.println("Task types: " + taskTypes);
+ LOGGER.info("Task types: {}", taskTypes);
} catch (PinotAdminException e) {
- System.out.println("Admin operation failed: " + e.getMessage());
+ LOGGER.error("Admin operation failed", e);
}
}
private static void exampleWithAuthentication(PinotAdminClient adminClient)
- throws PinotAdminException {
- System.out.println("=== Authentication Example ===");
+ throws IOException {
+ LOGGER.info("=== Authentication Example ===");
try {
// Get a specific table configuration
String tableConfig =
adminClient.getTableClient().getTableConfig("myTable");
- System.out.println("Table config: " + tableConfig);
+ LOGGER.info("Table config: {}", tableConfig);
+
+ TableConfig offlineTableConfig =
+ adminClient.getTableClient().getTableConfigObjectForType("myTable",
TableType.OFFLINE);
+ LOGGER.info("Typed table config: {}", offlineTableConfig.getTableName());
+
+ Schema schema = adminClient.getSchemaClient().getSchemaObject("myTable");
+ LOGGER.info("Typed schema: {}", schema.getSchemaName());
// Validate a schema
String schemaConfig =
"{\"schemaName\":\"testSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
String validationResult =
adminClient.getSchemaClient().validateSchema(schemaConfig);
- System.out.println("Schema validation: " + validationResult);
+ LOGGER.info("Schema validation: {}", validationResult);
} catch (PinotAdminAuthenticationException e) {
- System.out.println("Authentication failed: " + e.getMessage());
+ LOGGER.error("Authentication failed", e);
} catch (PinotAdminNotFoundException e) {
- System.out.println("Resource not found: " + e.getMessage());
+ LOGGER.error("Resource not found", e);
} catch (PinotAdminException e) {
- System.out.println("Admin operation failed: " + e.getMessage());
+ LOGGER.error("Admin operation failed", e);
}
}
private static void exampleWithBearerAuth(PinotAdminClient adminClient)
throws PinotAdminException {
- System.out.println("=== Bearer Authentication Example ===");
+ LOGGER.info("=== Bearer Authentication Example ===");
try {
// Create a new schema
String schemaConfig =
"{\"schemaName\":\"exampleSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
String createResult =
adminClient.getSchemaClient().createSchema(schemaConfig);
- System.out.println("Schema creation: " + createResult);
+ LOGGER.info("Schema creation: {}", createResult);
// Get instance information
var liveInstances = adminClient.getInstanceClient().listLiveInstances();
- System.out.println("Live instances: " + liveInstances);
+ LOGGER.info("Live instances: {}", liveInstances);
} catch (PinotAdminAuthenticationException e) {
- System.out.println("Authentication failed: " + e.getMessage());
+ LOGGER.error("Authentication failed", e);
} catch (PinotAdminValidationException e) {
- System.out.println("Validation failed: " + e.getMessage());
+ LOGGER.error("Validation failed", e);
} catch (PinotAdminException e) {
- System.out.println("Admin operation failed: " + e.getMessage());
+ LOGGER.error("Admin operation failed", e);
}
}
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
index bae4eb861b0..270397ec533 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
@@ -55,6 +55,9 @@ The admin client consists of:
```java
import org.apache.pinot.client.admin.*;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
// Create client without authentication
try(PinotAdminClient adminClient = new PinotAdminClient("localhost:9000")){
@@ -64,6 +67,9 @@ List<String> tables =
adminClient.getTableClient().listTables(null, null, null);
// Get a specific table configuration
String config = adminClient.getTableClient().getTableConfig("myTable");
+// Get a typed table configuration
+TableConfig offlineConfig =
adminClient.getTableClient().getTableConfigObjectForType("myTable",
TableType.OFFLINE);
+
// List schemas
List<String> schemas = adminClient.getSchemaClient().listSchemaNames();
}
@@ -152,7 +158,7 @@ List<String> schemas = schemaClient.listSchemaNames();
// Get schema configuration as JSON
String schema = schemaClient.getSchema("mySchema");
-// Get a typed Schema object
+// Get a typed schema object
Schema schemaObject = schemaClient.getSchemaObject("mySchema");
// Create a new schema
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
index 99996098b22..e34cbe3d7f7 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
@@ -126,6 +126,14 @@ public class TableAdminClient extends
BaseServiceAdminClient {
return response.toString();
}
+ /**
+ * Gets the configuration for a specific raw table and type as a typed
object.
+ */
+ public TableConfig getTableConfigObjectForType(String tableName, TableType
tableType)
+ throws PinotAdminException, IOException {
+ return getTableConfigObject(tableName, tableType.name());
+ }
+
/**
* Gets the configuration for a specific table as a typed object.
*/
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
index edac49269c6..c6a9a820a01 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
@@ -139,6 +139,23 @@ public class PinotAdminClientTest {
eq(HEADERS));
}
+ @Test
+ public void testGetTypedTableConfig()
+ throws Exception {
+ TableConfig expectedTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("tbl1").build();
+ String jsonResponse = "{\"OFFLINE\":" +
JsonUtils.objectToString(expectedTableConfig) + "}";
+ JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
+ lenient().when(_mockTransport.executeGet(anyString(), anyString(), any(),
any()))
+ .thenReturn(mockResponse);
+
+ TableConfig tableConfig =
_adminClient.getTableClient().getTableConfigObjectForType("tbl1",
TableType.OFFLINE);
+
+ assertEquals(tableConfig.getTableName(), "tbl1_OFFLINE");
+ assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
+ verify(_mockTransport).executeGet(eq(CONTROLLER_ADDRESS),
eq("/tables/tbl1"), eq(Map.of("type", "OFFLINE")),
+ eq(HEADERS));
+ }
+
@Test
public void testListSchemas()
throws Exception {
diff --git a/pinot-connectors/pinot-flink-connector/README.md
b/pinot-connectors/pinot-flink-connector/README.md
index 6013b3a5e81..278eac86a43 100644
--- a/pinot-connectors/pinot-flink-connector/README.md
+++ b/pinot-connectors/pinot-flink-connector/README.md
@@ -30,17 +30,28 @@ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvi
execEnv.setParallelism(2); // optional
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
-// Create a ControllerRequestClient to fetch Pinot schema and table config
-HttpClient httpClient = HttpClient.getInstance();
-ControllerRequestClient client = new ControllerRequestClient(
-ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
+// Create a PinotAdminClient to fetch Pinot schema and table config
+String controllerUrl = "http://localhost:9000";
+URI controllerUri = URI.create(controllerUrl);
+String controllerAddress = controllerUri.getAuthority();
+String controllerPath = controllerUri.getPath();
+if (controllerPath != null && !controllerPath.isEmpty() &&
!"/".equals(controllerPath)) {
+ controllerAddress += controllerPath.endsWith("/") ?
controllerPath.substring(0, controllerPath.length() - 1)
+ : controllerPath;
+}
+Properties properties = new Properties();
+properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME,
controllerUri.getScheme());
-// fetch Pinot schema
-Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
-// fetch Pinot table config
-TableConfig tableConfig =
client.getTableClient().getTableConfigObject("starbucksStores", "OFFLINE");
-// create Flink Pinot Sink
-srcDs.addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
+try (PinotAdminClient client = new PinotAdminClient(controllerAddress,
properties)) {
+ // fetch Pinot schema
+ Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
+ // fetch Pinot table config
+ TableConfig tableConfig =
+ client.getTableClient().getTableConfigObjectForType("starbucksStores",
TableType.OFFLINE);
+ // create Flink Pinot Sink
+ srcDs.addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema,
+ controllerUrl));
+}
execEnv.execute();
```
@@ -48,22 +59,33 @@ execEnv.execute();
```java
// Set up flink env and data source
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
-execEnv.setParallelism(2); // mandatory for upsert tables wi
+execEnv.setParallelism(2); // mandatory for upsert tables
DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
-// Create a ControllerRequestClient to fetch Pinot schema and table config
-HttpClient httpClient = HttpClient.getInstance();
-ControllerRequestClient client = new ControllerRequestClient(
-ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
+// Create a PinotAdminClient to fetch Pinot schema and table config
+String controllerUrl = "http://localhost:9000";
+URI controllerUri = URI.create(controllerUrl);
+String controllerAddress = controllerUri.getAuthority();
+String controllerPath = controllerUri.getPath();
+if (controllerPath != null && !controllerPath.isEmpty() &&
!"/".equals(controllerPath)) {
+ controllerAddress += controllerPath.endsWith("/") ?
controllerPath.substring(0, controllerPath.length() - 1)
+ : controllerPath;
+}
+Properties properties = new Properties();
+properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME,
controllerUri.getScheme());
-// fetch Pinot schema
-Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
-// fetch Pinot table config
-TableConfig tableConfig =
client.getTableClient().getTableConfigObject("starbucksStores", "REALTIME");
+try (PinotAdminClient client = new PinotAdminClient(controllerAddress,
properties)) {
+ // fetch Pinot schema
+ Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
+ // fetch Pinot table config
+ TableConfig tableConfig =
+ client.getTableClient().getTableConfigObjectForType("starbucksStores",
TableType.REALTIME);
-// create Flink Pinot Sink (partition it same as the realtime stream(e.g.
kafka) in case of upsert tables)
-srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key %
partitions, r -> (Integer) r.getField("primaryKey"))
- .addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
+ // create Flink Pinot Sink (partition it same as the realtime stream(e.g.
kafka) in case of upsert tables)
+ srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key %
partitions, r -> (Integer) r.getField("primaryKey"))
+ .addSink(new PinotSinkFunction<>(new
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema,
+ controllerUrl));
+}
execEnv.execute();
```
diff --git a/pinot-connectors/pinot-flink-connector/pom.xml
b/pinot-connectors/pinot-flink-connector/pom.xml
index c9b9471f856..6e7ac204113 100644
--- a/pinot-connectors/pinot-flink-connector/pom.xml
+++ b/pinot-connectors/pinot-flink-connector/pom.xml
@@ -40,12 +40,16 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
- <artifactId>pinot-controller</artifactId>
+ <artifactId>pinot-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-segment-writer-file-based</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-segment-uploader-default</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -63,6 +67,11 @@
</dependency>
<!-- Test Dependencies -->
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-controller</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-integration-test-base</artifactId>
diff --git
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
index 312a5b34788..dd818cd3168 100644
---
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
+++
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
@@ -21,9 +21,11 @@ package org.apache.pinot.connector.flink;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -31,9 +33,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.client.admin.PinotAdminTransport;
import org.apache.pinot.connector.flink.common.FlinkRowGenericRowConverter;
import org.apache.pinot.connector.flink.sink.PinotSinkFunction;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
@@ -47,7 +51,7 @@ public final class FlinkQuickStart {
public static final RowTypeInfo TEST_TYPE_INFO =
new RowTypeInfo(new TypeInformation[]{Types.FLOAT, Types.FLOAT,
Types.STRING, Types.STRING},
new String[]{"lon", "lat", "address", "name"});
- private static final String DEFAULT_CONTROLLER_ADDRESS = "localhost:9000";
+ private static final String DEFAULT_CONTROLLER_URL = "http://localhost:9000";
private static List<Row> loadData()
throws IOException {
@@ -76,11 +80,24 @@ public final class FlinkQuickStart {
execEnv.setParallelism(2);
DataStream<Row> srcDs =
execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0));
- try (PinotAdminClient client = new
PinotAdminClient(DEFAULT_CONTROLLER_ADDRESS)) {
+ URI controllerUri = URI.create(DEFAULT_CONTROLLER_URL);
+ String controllerAddress = controllerUri.getAuthority();
+ String controllerPath = controllerUri.getPath();
+ if (controllerPath != null && !controllerPath.isEmpty() &&
!"/".equals(controllerPath)) {
+ controllerAddress += controllerPath.endsWith("/") ?
controllerPath.substring(0, controllerPath.length() - 1)
+ : controllerPath;
+ }
+ Properties properties = new Properties();
+ properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME,
controllerUri.getScheme());
+
+ try (PinotAdminClient client = new PinotAdminClient(controllerAddress,
properties)) {
Schema schema =
client.getSchemaClient().getSchemaObject("starbucksStores");
- TableConfig tableConfig =
client.getTableClient().getTableConfigObject("starbucksStores", "OFFLINE");
- srcDs.addSink(new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(TEST_TYPE_INFO), tableConfig, schema));
- execEnv.execute();
+ TableConfig tableConfig =
+
client.getTableClient().getTableConfigObjectForType("starbucksStores",
TableType.OFFLINE);
+ srcDs.addSink(
+ new PinotSinkFunction<>(new
FlinkRowGenericRowConverter(TEST_TYPE_INFO), tableConfig, schema,
+ DEFAULT_CONTROLLER_URL));
}
+ execEnv.execute();
}
}
diff --git
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
index e7204ef6b66..66ecfcd1ce7 100644
---
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
+++
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
@@ -19,6 +19,10 @@
package org.apache.pinot.connector.flink.sink;
import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -31,7 +35,10 @@ import
org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.pinot.connector.flink.common.PinotGenericRowConverter;
import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
import org.slf4j.Logger;
@@ -49,6 +56,7 @@ public class PinotSinkFunction<T> extends RichSinkFunction<T>
implements Checkpo
public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000;
public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
+ public static final String DEFAULT_OUTPUT_DIR_URI = "/tmp/pinotoutput";
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(PinotSinkFunction.class);
@@ -75,6 +83,16 @@ public class PinotSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
this(recordConverter, tableConfig, schema,
DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
}
+ /**
+ * Creates a sink using a table config fetched from the controller and
injects the upload settings required by the
+ * Flink connector.
+ */
+ public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter,
TableConfig tableConfig, Schema schema,
+ String controllerBaseUrl) {
+ this(recordConverter, prepareTableConfigForSink(tableConfig,
controllerBaseUrl), schema,
+ DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
+ }
+
public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter,
TableConfig tableConfig, Schema schema,
long segmentFlushMaxNumRecords, int executorPoolSize) {
this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords,
executorPoolSize, null, null);
@@ -92,6 +110,45 @@ public class PinotSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
_segmentUploadTimeMs = segmentUploadTimeMs;
}
+ /**
+ * Applies the connector-specific batch-ingestion defaults needed for
segment generation and upload.
+ */
+ static TableConfig prepareTableConfigForSink(TableConfig tableConfig, String
controllerBaseUrl) {
+ Map<String, String> requiredBatchConfig = new HashMap<>();
+ requiredBatchConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
controllerBaseUrl);
+ requiredBatchConfig.put(BatchConfigProperties.OUTPUT_DIR_URI,
DEFAULT_OUTPUT_DIR_URI);
+
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ if (ingestionConfig == null) {
+ ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new
BatchIngestionConfig(Collections.singletonList(requiredBatchConfig), "APPEND",
"HOURLY"));
+ tableConfig.setIngestionConfig(ingestionConfig);
+ return tableConfig;
+ }
+
+ BatchIngestionConfig batchIngestionConfig =
ingestionConfig.getBatchIngestionConfig();
+ if (batchIngestionConfig == null) {
+ ingestionConfig.setBatchIngestionConfig(
+ new
BatchIngestionConfig(Collections.singletonList(requiredBatchConfig), "APPEND",
"HOURLY"));
+ return tableConfig;
+ }
+
+ List<Map<String, String>> batchConfigMaps =
batchIngestionConfig.getBatchConfigMaps();
+ if (batchConfigMaps == null || batchConfigMaps.isEmpty()) {
+
batchIngestionConfig.setBatchConfigMaps(Collections.singletonList(requiredBatchConfig));
+ } else if (batchConfigMaps.size() > 1) {
+ throw new IllegalStateException(String.format(
+ "Flink connector requires exactly 1 batchConfigMap for table %s, got
%d", tableConfig.getTableName(),
+ batchConfigMaps.size()));
+ } else {
+ Map<String, String> mergedBatchConfig = new
HashMap<>(batchConfigMaps.get(0));
+ mergedBatchConfig.putAll(requiredBatchConfig);
+
batchIngestionConfig.setBatchConfigMaps(Collections.singletonList(mergedBatchConfig));
+ }
+ return tableConfig;
+ }
+
@Override
public void open(Configuration parameters)
throws Exception {
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkFunctionTest.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkFunctionTest.java
new file mode 100644
index 00000000000..9fcaea26e68
--- /dev/null
+++
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkFunctionTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class PinotSinkFunctionTest {
+ @Test
+ public void testPrepareTableConfigForSinkAddsControllerBatchConfig() {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
+
+ TableConfig result =
PinotSinkFunction.prepareTableConfigForSink(tableConfig,
"http://localhost:9000");
+
+ assertNotNull(result.getIngestionConfig());
+ BatchIngestionConfig batchIngestionConfig =
result.getIngestionConfig().getBatchIngestionConfig();
+ assertNotNull(batchIngestionConfig);
+ Map<String, String> batchConfigMap =
batchIngestionConfig.getBatchConfigMaps().get(0);
+
assertEquals(batchConfigMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+ assertEquals(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI),
+ PinotSinkFunction.DEFAULT_OUTPUT_DIR_URI);
+ }
+
+ @Test
+ public void
testPrepareTableConfigForSinkMergesOverridesIntoExistingBatchConfig() {
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig
+ .setBatchIngestionConfig(new
BatchIngestionConfig(List.of(Map.of("existing", "value")), "APPEND", "DAILY"));
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
+ .setIngestionConfig(ingestionConfig)
+ .build();
+
+ TableConfig result =
PinotSinkFunction.prepareTableConfigForSink(tableConfig,
"http://localhost:9000");
+
+ List<Map<String, String>> batchConfigMaps =
+
result.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
+ assertEquals(batchConfigMaps.size(), 1);
+ Map<String, String> mergedBatchConfigMap = batchConfigMaps.get(0);
+ assertEquals(mergedBatchConfigMap.get("existing"), "value");
+
assertEquals(mergedBatchConfigMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
"http://localhost:9000");
+
assertEquals(mergedBatchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI),
+ PinotSinkFunction.DEFAULT_OUTPUT_DIR_URI);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*requires exactly 1
batchConfigMap.*")
+ public void testPrepareTableConfigForSinkRejectsMultipleBatchConfigs() {
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(List.of(Map.of("first", "value"),
Map.of("second", "value")), "APPEND", "DAILY"));
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
+ .setIngestionConfig(ingestionConfig)
+ .build();
+
+ PinotSinkFunction.prepareTableConfigForSink(tableConfig,
"http://localhost:9000");
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 9d86f827a03..cb29afe3d20 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.client.admin.ZookeeperAdminClient;
import org.apache.pinot.common.restlet.resources.RebalanceResult;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
@@ -1204,13 +1205,15 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
public void testTableTasksValidationWithDanglingTasks()
throws Exception {
String tableName = "testTableTasksValidationWithDangling";
+ String tableNameWithType = tableName + "_OFFLINE";
+ String taskType = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
DEFAULT_INSTANCE.addDummySchema(tableName);
TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
.setTaskConfig(new TableTaskConfig(Map.of(
- MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ taskType,
Map.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
- CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ CommonConstants.TABLE_NAME, tableNameWithType))))
.build();
// First create the table successfully
@@ -1219,27 +1222,27 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
// Create a task manually to simulate dangling task
PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
TaskSchedulingContext context = new TaskSchedulingContext();
- context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ context.setTablesToSchedule(Set.of(tableNameWithType));
Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
- waitForTaskState(taskName, TaskState.IN_PROGRESS);
- boolean taskKnown = fetchTaskState(taskName) != null;
+ waitForTaskToBecomeActiveForCleanup(taskType, tableNameWithType, taskName);
// Now try to create another table with same name (simulating re-creation
with dangling tasks)
deleteTable(tableName, null, true);
try {
createTable(offlineTableConfig.toJsonString());
- if (taskKnown) {
- fail("Table creation should fail when dangling tasks exist");
- }
+ fail("Table creation should fail when dangling tasks exist");
} catch (IOException e) {
- if (taskKnown) {
- assertTrue(e.getMessage().contains("The table has dangling task
data"));
- }
+ assertTrue(e.getMessage().contains("The table has dangling task data"));
}
// Clean up any remaining tasks
+ try {
+ taskResourceManager().deleteTask(taskName, true);
+ } catch (Exception ignored) {
+ // Ignore if task no longer exists
+ }
try {
deleteTable(tableName, null, true);
} catch (Exception ignored) {
@@ -1268,13 +1271,15 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
public void testTableTasksCleanupWithNonActiveTasks()
throws Exception {
String tableName = "testTableTasksCleanup";
+ String tableNameWithType = tableName + "_OFFLINE";
+ String taskType = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
DEFAULT_INSTANCE.addDummySchema(tableName);
TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
.setTaskConfig(new TableTaskConfig(Map.of(
- MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ taskType,
Map.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
- CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ CommonConstants.TABLE_NAME, tableNameWithType))))
.build();
// Create table
@@ -1283,65 +1288,82 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
// Create some completed tasks
PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
TaskSchedulingContext context = new TaskSchedulingContext();
- context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
- Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
- String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
- waitForTaskState(taskName, TaskState.IN_PROGRESS);
- boolean taskKnown = fetchTaskState(taskName) != null;
-
- if (taskKnown) {
- // stop the task queue to abort the task
-
taskClient().stopTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
- waitForTaskState(taskName, TaskState.STOPPED);
- // resume the task queue again to avoid affecting other tests
-
taskClient().resumeTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+ context.setTablesToSchedule(Set.of(tableNameWithType));
+ taskManager.scheduleTasks(context);
+ taskResourceManager().stopTaskQueue(taskType);
+ try {
+ waitForTaskQueueState(taskType, TaskState.STOPPED);
+
+ // Delete table - should succeed and clean up tasks once the task stop
is visible to cleanup.
+ String deleteResponse = deleteTableWhenNoActiveTasksRemain(tableName);
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" +
tableNameWithType + "] deleted\"}");
+ } finally {
+ taskResourceManager().resumeTaskQueue(taskType);
}
+ }
- // Delete table - should succeed and clean up tasks
- String deleteResponse = deleteTable(tableName);
- assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
+ private PinotHelixTaskResourceManager taskResourceManager() {
+ return
DEFAULT_INSTANCE.getControllerStarter().getHelixTaskResourceManager();
+ }
+
+ private void waitForTaskQueueState(String taskType, TaskState expectedState)
{
+ TestUtils.waitForCondition((aVoid) ->
taskResourceManager().getTaskQueueState(taskType) == expectedState, 60_000,
+ "Task queue not in expected state " + expectedState);
}
- private void waitForTaskState(String taskName, TaskState expectedState) {
+ private void waitForTaskToBecomeActiveForCleanup(String taskType, String
tableNameWithType, String taskName) {
TestUtils.waitForCondition((aVoid) -> {
try {
- TaskState state = taskClient().getTaskState(taskName);
- if (state == null) {
- // If we cannot fetch state, treat IN_PROGRESS waits as best-effort
no-ops.
- return expectedState == TaskState.IN_PROGRESS;
- }
- // In test environments without a running minion, Helix can keep tasks
in NOT_STARTED even though they
- // are enqueued. Accept NOT_STARTED when we're only validating the
presence of an in-flight task.
- if (expectedState == TaskState.IN_PROGRESS && state ==
TaskState.NOT_STARTED) {
- return true;
- }
- return state == expectedState;
+ return isTaskActiveForCleanup(taskType, tableNameWithType, taskName);
} catch (Exception e) {
- // If state lookup fails, consider IN_PROGRESS satisfied (task was
enqueued) but surface other states.
- return expectedState == TaskState.IN_PROGRESS;
+ return false;
}
- }, 60_000, "Task not scheduled to expected state " + expectedState);
+ }, 60_000, "Task not active for table cleanup: " + taskName);
}
- private TaskState fetchTaskState(String taskName) {
- try {
- return taskClient().getTaskState(taskName);
- } catch (Exception e) {
- return null;
+ private boolean isTaskActiveForCleanup(String taskType, String
tableNameWithType, String taskName) {
+ TaskState taskState = taskResourceManager().getTaskStatesByTable(taskType,
tableNameWithType).get(taskName);
+ return taskState == TaskState.IN_PROGRESS &&
taskResourceManager().getTaskCount(taskName).getRunning() > 0;
+ }
+
+ private String deleteTableWhenNoActiveTasksRemain(String tableName)
+ throws IOException {
+ long deadlineMs = System.currentTimeMillis() + 60_000L;
+ IOException lastActiveTaskError = null;
+ while (System.currentTimeMillis() < deadlineMs) {
+ try {
+ return deleteTable(tableName);
+ } catch (IOException e) {
+ String message = e.getMessage();
+ if (message == null || !message.contains("active running tasks")) {
+ throw e;
+ }
+ lastActiveTaskError = e;
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for task cleanup to
finish", interruptedException);
+ }
+ }
}
+ throw lastActiveTaskError != null ? lastActiveTaskError
+ : new IOException("Timed out waiting for table deletion to succeed");
}
@Test
public void testTableTasksCleanupWithActiveTasks()
throws Exception {
String tableName = "testTableTasksCleanupActive";
+ String tableNameWithType = tableName + "_OFFLINE";
+ String taskType = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
DEFAULT_INSTANCE.addDummySchema(tableName);
TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
.setTaskConfig(new TableTaskConfig(Map.of(
- MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ taskType,
Map.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
- CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+ CommonConstants.TABLE_NAME, tableNameWithType))))
.build();
// Create table
@@ -1350,38 +1372,27 @@ public class PinotTableRestletResourceTest extends
ControllerTest {
// Create an active/in-progress task
PinotTaskManager taskManager =
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
TaskSchedulingContext context = new TaskSchedulingContext();
- context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+ context.setTablesToSchedule(Set.of(tableNameWithType));
Map<String, TaskSchedulingInfo> taskInfo =
taskManager.scheduleTasks(context);
String taskName =
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
- waitForTaskState(taskName, TaskState.IN_PROGRESS);
- boolean taskKnown = fetchTaskState(taskName) != null;
+ waitForTaskToBecomeActiveForCleanup(taskType, tableNameWithType, taskName);
try {
// Try to delete table without ignoring active tasks - should fail
deleteTable(tableName);
- if (taskKnown) {
- fail("Table deletion should fail when active tasks exist");
- }
+ fail("Table deletion should fail when active tasks exist");
} catch (IOException e) {
- if (taskKnown) {
- assertTrue(e.getMessage().contains("The table has") &&
e.getMessage().contains("active running tasks"));
- }
+ assertTrue(e.getMessage().contains("The table has") &&
e.getMessage().contains("active running tasks"));
}
// Delete table with ignoreActiveTasks flag - should succeed
- try {
- String deleteResponse =
getOrCreateAdminClient().getTableClient().deleteTable(tableName, null, null,
true);
- assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName +
"_OFFLINE] deleted\"}");
- } catch (Exception e) {
- // Table might already be removed if task state was unknown; ignore
missing table errors
- String message = e.getMessage();
- if (message == null || !message.contains("does not exist")) {
- throw e;
- }
- }
+ String deleteResponse =
getOrCreateAdminClient().getTableClient().deleteTable(tableName, null, null,
true);
+ assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableNameWithType
+ "] deleted\"}");
// delete task
- if (taskKnown) {
- getOrCreateAdminClient().getTaskClient().deleteTask(taskName, true);
+ try {
+ taskResourceManager().deleteTask(taskName, true);
+ } catch (Exception ignored) {
+ // Ignore if task no longer exists
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]