This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 276633679fa Decouple Flink connector from pinot-controller (#18054)
276633679fa is described below

commit 276633679fac8829d114a36bc7913c7cc0ed2e9e
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Apr 9 17:52:33 2026 -0700

    Decouple Flink connector from pinot-controller (#18054)
    
    - Replace ControllerRequestClient with PinotAdminClient in Flink connector
    - Add typed TableConfig/Schema fetch APIs to pinot-java-client
    - Merge batch config overrides instead of appending (SegmentUploaderDefault 
requires exactly 1)
    - Pass tableType query param in getTableConfig for efficient filtering
    - Preserve controller path prefix in FlinkQuickStart for proxied deployments
    - Use SLF4J parameterized logging in PinotAdminClientExample
    - Update README with new PinotAdminClient API usage
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../client/admin/PinotAdminClientExample.java      |  79 ++++++-----
 .../java/org/apache/pinot/client/admin/README.md   |   8 +-
 .../pinot/client/admin/TableAdminClient.java       |   8 ++
 .../pinot/client/admin/PinotAdminClientTest.java   |  17 +++
 pinot-connectors/pinot-flink-connector/README.md   |  66 ++++++---
 pinot-connectors/pinot-flink-connector/pom.xml     |  11 +-
 .../pinot/connector/flink/FlinkQuickStart.java     |  27 +++-
 .../connector/flink/sink/PinotSinkFunction.java    |  57 ++++++++
 .../flink/sink/PinotSinkFunctionTest.java          |  84 +++++++++++
 .../api/PinotTableRestletResourceTest.java         | 153 +++++++++++----------
 10 files changed, 377 insertions(+), 133 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
index 3f567a0a5e2..ec376b3cf77 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/PinotAdminClientExample.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pinot.client.admin;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Example demonstrating how to use PinotAdminClient.
  */
 public class PinotAdminClientExample {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotAdminClientExample.class);
+
   private PinotAdminClientExample() {
   }
 
@@ -34,7 +42,7 @@ public class PinotAdminClientExample {
     try (PinotAdminClient adminClient = new 
PinotAdminClient("localhost:9000")) {
       exampleBasicUsage(adminClient);
     } catch (Exception e) {
-      System.err.println("Error in basic usage example: " + e.getMessage());
+      LOGGER.error("Error in basic usage example", e);
     }
 
     // Example 2: Usage with basic authentication
@@ -42,102 +50,107 @@ public class PinotAdminClientExample {
       Properties properties = new Properties();
       properties.setProperty("pinot.admin.request.timeout.ms", "30000");
 
-      PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", 
properties,
+      try (PinotAdminClient adminClient = new 
PinotAdminClient("localhost:9000", properties,
           PinotAdminAuthentication.AuthType.BASIC,
-          Map.of("username", "admin", "password", "password"));
-
-      exampleWithAuthentication(adminClient);
-      adminClient.close();
+          Map.of("username", "admin", "password", "password"))) {
+        exampleWithAuthentication(adminClient);
+      }
     } catch (Exception e) {
-      System.err.println("Error in authentication example: " + e.getMessage());
+      LOGGER.error("Error in authentication example", e);
     }
 
     // Example 3: Usage with bearer token authentication
     try {
       Properties properties = new Properties();
-      PinotAdminClient adminClient = new PinotAdminClient("localhost:9000", 
properties,
+      try (PinotAdminClient adminClient = new 
PinotAdminClient("localhost:9000", properties,
           PinotAdminAuthentication.AuthType.BEARER,
-          Map.of("token", "your-bearer-token"));
-
-      exampleWithBearerAuth(adminClient);
-      adminClient.close();
+          Map.of("token", "your-bearer-token"))) {
+        exampleWithBearerAuth(adminClient);
+      }
     } catch (Exception e) {
-      System.err.println("Error in bearer auth example: " + e.getMessage());
+      LOGGER.error("Error in bearer auth example", e);
     }
   }
 
   private static void exampleBasicUsage(PinotAdminClient adminClient)
       throws PinotAdminException {
-    System.out.println("=== Basic Usage Example ===");
+    LOGGER.info("=== Basic Usage Example ===");
 
     try {
       // List tables
       var tables = adminClient.getTableClient().listTables(null, null, null);
-      System.out.println("Tables: " + tables);
+      LOGGER.info("Tables: {}", tables);
 
       // List schemas
       var schemas = adminClient.getSchemaClient().listSchemaNames();
-      System.out.println("Schemas: " + schemas);
+      LOGGER.info("Schemas: {}", schemas);
 
       // List instances
       var instances = adminClient.getInstanceClient().listInstances();
-      System.out.println("Instances: " + instances);
+      LOGGER.info("Instances: {}", instances);
 
       // List tenants
       var tenants = adminClient.getTenantClient().listTenants();
-      System.out.println("Tenants: " + tenants);
+      LOGGER.info("Tenants: {}", tenants);
 
       // List task types
       var taskTypes = adminClient.getTaskClient().listTaskTypes();
-      System.out.println("Task types: " + taskTypes);
+      LOGGER.info("Task types: {}", taskTypes);
     } catch (PinotAdminException e) {
-      System.out.println("Admin operation failed: " + e.getMessage());
+      LOGGER.error("Admin operation failed", e);
     }
   }
 
   private static void exampleWithAuthentication(PinotAdminClient adminClient)
-      throws PinotAdminException {
-    System.out.println("=== Authentication Example ===");
+      throws IOException {
+    LOGGER.info("=== Authentication Example ===");
 
     try {
       // Get a specific table configuration
       String tableConfig = 
adminClient.getTableClient().getTableConfig("myTable");
-      System.out.println("Table config: " + tableConfig);
+      LOGGER.info("Table config: {}", tableConfig);
+
+      TableConfig offlineTableConfig =
+          adminClient.getTableClient().getTableConfigObjectForType("myTable", 
TableType.OFFLINE);
+      LOGGER.info("Typed table config: {}", offlineTableConfig.getTableName());
+
+      Schema schema = adminClient.getSchemaClient().getSchemaObject("myTable");
+      LOGGER.info("Typed schema: {}", schema.getSchemaName());
 
       // Validate a schema
       String schemaConfig =
           
"{\"schemaName\":\"testSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
       String validationResult = 
adminClient.getSchemaClient().validateSchema(schemaConfig);
-      System.out.println("Schema validation: " + validationResult);
+      LOGGER.info("Schema validation: {}", validationResult);
     } catch (PinotAdminAuthenticationException e) {
-      System.out.println("Authentication failed: " + e.getMessage());
+      LOGGER.error("Authentication failed", e);
     } catch (PinotAdminNotFoundException e) {
-      System.out.println("Resource not found: " + e.getMessage());
+      LOGGER.error("Resource not found", e);
     } catch (PinotAdminException e) {
-      System.out.println("Admin operation failed: " + e.getMessage());
+      LOGGER.error("Admin operation failed", e);
     }
   }
 
   private static void exampleWithBearerAuth(PinotAdminClient adminClient)
       throws PinotAdminException {
-    System.out.println("=== Bearer Authentication Example ===");
+    LOGGER.info("=== Bearer Authentication Example ===");
 
     try {
       // Create a new schema
       String schemaConfig =
           
"{\"schemaName\":\"exampleSchema\",\"dimensionFieldSpecs\":[{\"name\":\"id\",\"dataType\":\"INT\"}]}";
       String createResult = 
adminClient.getSchemaClient().createSchema(schemaConfig);
-      System.out.println("Schema creation: " + createResult);
+      LOGGER.info("Schema creation: {}", createResult);
 
       // Get instance information
       var liveInstances = adminClient.getInstanceClient().listLiveInstances();
-      System.out.println("Live instances: " + liveInstances);
+      LOGGER.info("Live instances: {}", liveInstances);
     } catch (PinotAdminAuthenticationException e) {
-      System.out.println("Authentication failed: " + e.getMessage());
+      LOGGER.error("Authentication failed", e);
     } catch (PinotAdminValidationException e) {
-      System.out.println("Validation failed: " + e.getMessage());
+      LOGGER.error("Validation failed", e);
     } catch (PinotAdminException e) {
-      System.out.println("Admin operation failed: " + e.getMessage());
+      LOGGER.error("Admin operation failed", e);
     }
   }
 }
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
index bae4eb861b0..270397ec533 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/README.md
@@ -55,6 +55,9 @@ The admin client consists of:
 
 ```java
 import org.apache.pinot.client.admin.*;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 
 // Create client without authentication
 try(PinotAdminClient adminClient = new PinotAdminClient("localhost:9000")){
@@ -64,6 +67,9 @@ List<String> tables = 
adminClient.getTableClient().listTables(null, null, null);
 // Get a specific table configuration
 String config = adminClient.getTableClient().getTableConfig("myTable");
 
+// Get a typed table configuration
+TableConfig offlineConfig = 
adminClient.getTableClient().getTableConfigObjectForType("myTable", 
TableType.OFFLINE);
+
 // List schemas
 List<String> schemas = adminClient.getSchemaClient().listSchemaNames();
 }
@@ -152,7 +158,7 @@ List<String> schemas = schemaClient.listSchemaNames();
 // Get schema configuration as JSON
 String schema = schemaClient.getSchema("mySchema");
 
-// Get a typed Schema object
+// Get a typed schema object
 Schema schemaObject = schemaClient.getSchemaObject("mySchema");
 
 // Create a new schema
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
index 99996098b22..e34cbe3d7f7 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/admin/TableAdminClient.java
@@ -126,6 +126,14 @@ public class TableAdminClient extends 
BaseServiceAdminClient {
     return response.toString();
   }
 
+  /**
+   * Gets the configuration for a specific raw table and type as a typed 
object.
+   */
+  public TableConfig getTableConfigObjectForType(String tableName, TableType 
tableType)
+      throws PinotAdminException, IOException {
+    return getTableConfigObject(tableName, tableType.name());
+  }
+
   /**
    * Gets the configuration for a specific table as a typed object.
    */
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
index edac49269c6..c6a9a820a01 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/admin/PinotAdminClientTest.java
@@ -139,6 +139,23 @@ public class PinotAdminClientTest {
         eq(HEADERS));
   }
 
+  @Test
+  public void testGetTypedTableConfig()
+      throws Exception {
+    TableConfig expectedTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("tbl1").build();
+    String jsonResponse = "{\"OFFLINE\":" + 
JsonUtils.objectToString(expectedTableConfig) + "}";
+    JsonNode mockResponse = new ObjectMapper().readTree(jsonResponse);
+    lenient().when(_mockTransport.executeGet(anyString(), anyString(), any(), 
any()))
+        .thenReturn(mockResponse);
+
+    TableConfig tableConfig = 
_adminClient.getTableClient().getTableConfigObjectForType("tbl1", 
TableType.OFFLINE);
+
+    assertEquals(tableConfig.getTableName(), "tbl1_OFFLINE");
+    assertEquals(tableConfig.getTableType(), TableType.OFFLINE);
+    verify(_mockTransport).executeGet(eq(CONTROLLER_ADDRESS), 
eq("/tables/tbl1"), eq(Map.of("type", "OFFLINE")),
+        eq(HEADERS));
+  }
+
   @Test
   public void testListSchemas()
       throws Exception {
diff --git a/pinot-connectors/pinot-flink-connector/README.md 
b/pinot-connectors/pinot-flink-connector/README.md
index 6013b3a5e81..278eac86a43 100644
--- a/pinot-connectors/pinot-flink-connector/README.md
+++ b/pinot-connectors/pinot-flink-connector/README.md
@@ -30,17 +30,28 @@ StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvi
 execEnv.setParallelism(2); // optional
 DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
 
-// Create a ControllerRequestClient to fetch Pinot schema and table config
-HttpClient httpClient = HttpClient.getInstance();
-ControllerRequestClient client = new ControllerRequestClient(
-ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
+// Create a PinotAdminClient to fetch Pinot schema and table config
+String controllerUrl = "http://localhost:9000";;
+URI controllerUri = URI.create(controllerUrl);
+String controllerAddress = controllerUri.getAuthority();
+String controllerPath = controllerUri.getPath();
+if (controllerPath != null && !controllerPath.isEmpty() && 
!"/".equals(controllerPath)) {
+  controllerAddress += controllerPath.endsWith("/") ? 
controllerPath.substring(0, controllerPath.length() - 1)
+      : controllerPath;
+}
+Properties properties = new Properties();
+properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, 
controllerUri.getScheme());
 
-// fetch Pinot schema
-Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
-// fetch Pinot table config
-TableConfig tableConfig = 
client.getTableClient().getTableConfigObject("starbucksStores", "OFFLINE");
-// create Flink Pinot Sink
-srcDs.addSink(new PinotSinkFunction<>(new 
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
+try (PinotAdminClient client = new PinotAdminClient(controllerAddress, 
properties)) {
+  // fetch Pinot schema
+  Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
+  // fetch Pinot table config
+  TableConfig tableConfig =
+      client.getTableClient().getTableConfigObjectForType("starbucksStores", 
TableType.OFFLINE);
+  // create Flink Pinot Sink
+  srcDs.addSink(new PinotSinkFunction<>(new 
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema,
+      controllerUrl));
+}
 execEnv.execute();
 ```
 
@@ -48,22 +59,33 @@ execEnv.execute();
 ```java
 // Set up flink env and data source
 StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-execEnv.setParallelism(2); // mandatory for upsert tables wi
+execEnv.setParallelism(2); // mandatory for upsert tables
 DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
 
-// Create a ControllerRequestClient to fetch Pinot schema and table config
-HttpClient httpClient = HttpClient.getInstance();
-ControllerRequestClient client = new ControllerRequestClient(
-ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
+// Create a PinotAdminClient to fetch Pinot schema and table config
+String controllerUrl = "http://localhost:9000";;
+URI controllerUri = URI.create(controllerUrl);
+String controllerAddress = controllerUri.getAuthority();
+String controllerPath = controllerUri.getPath();
+if (controllerPath != null && !controllerPath.isEmpty() && 
!"/".equals(controllerPath)) {
+  controllerAddress += controllerPath.endsWith("/") ? 
controllerPath.substring(0, controllerPath.length() - 1)
+      : controllerPath;
+}
+Properties properties = new Properties();
+properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, 
controllerUri.getScheme());
 
-// fetch Pinot schema
-Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
-// fetch Pinot table config
-TableConfig tableConfig = 
client.getTableClient().getTableConfigObject("starbucksStores", "REALTIME");
+try (PinotAdminClient client = new PinotAdminClient(controllerAddress, 
properties)) {
+  // fetch Pinot schema
+  Schema schema = client.getSchemaClient().getSchemaObject("starbucksStores");
+  // fetch Pinot table config
+  TableConfig tableConfig =
+      client.getTableClient().getTableConfigObjectForType("starbucksStores", 
TableType.REALTIME);
 
-// create Flink Pinot Sink (partition it same as the realtime stream(e.g. 
kafka) in case of upsert tables)
-srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key % 
partitions, r -> (Integer) r.getField("primaryKey"))
-    .addSink(new PinotSinkFunction<>(new 
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema));
+  // create Flink Pinot Sink (partition it same as the realtime stream(e.g. 
kafka) in case of upsert tables)
+  srcDs.partitionCustom((Partitioner<Integer>) (key, partitions) -> key % 
partitions, r -> (Integer) r.getField("primaryKey"))
+      .addSink(new PinotSinkFunction<>(new 
PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema,
+          controllerUrl));
+}
 execEnv.execute();
 
 ```
diff --git a/pinot-connectors/pinot-flink-connector/pom.xml 
b/pinot-connectors/pinot-flink-connector/pom.xml
index c9b9471f856..6e7ac204113 100644
--- a/pinot-connectors/pinot-flink-connector/pom.xml
+++ b/pinot-connectors/pinot-flink-connector/pom.xml
@@ -40,12 +40,16 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-controller</artifactId>
+      <artifactId>pinot-core</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-segment-writer-file-based</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-segment-uploader-default</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.flink</groupId>
@@ -63,6 +67,11 @@
     </dependency>
 
     <!-- Test Dependencies -->
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-controller</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-integration-test-base</artifactId>
diff --git 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
index 312a5b34788..dd818cd3168 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java
@@ -21,9 +21,11 @@ package org.apache.pinot.connector.flink;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -31,9 +33,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.pinot.client.admin.PinotAdminClient;
+import org.apache.pinot.client.admin.PinotAdminTransport;
 import org.apache.pinot.connector.flink.common.FlinkRowGenericRowConverter;
 import org.apache.pinot.connector.flink.sink.PinotSinkFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -47,7 +51,7 @@ public final class FlinkQuickStart {
   public static final RowTypeInfo TEST_TYPE_INFO =
       new RowTypeInfo(new TypeInformation[]{Types.FLOAT, Types.FLOAT, 
Types.STRING, Types.STRING},
           new String[]{"lon", "lat", "address", "name"});
-  private static final String DEFAULT_CONTROLLER_ADDRESS = "localhost:9000";
+  private static final String DEFAULT_CONTROLLER_URL = "http://localhost:9000";;
 
   private static List<Row> loadData()
       throws IOException {
@@ -76,11 +80,24 @@ public final class FlinkQuickStart {
     execEnv.setParallelism(2);
     DataStream<Row> srcDs = 
execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0));
 
-    try (PinotAdminClient client = new 
PinotAdminClient(DEFAULT_CONTROLLER_ADDRESS)) {
+    URI controllerUri = URI.create(DEFAULT_CONTROLLER_URL);
+    String controllerAddress = controllerUri.getAuthority();
+    String controllerPath = controllerUri.getPath();
+    if (controllerPath != null && !controllerPath.isEmpty() && 
!"/".equals(controllerPath)) {
+      controllerAddress += controllerPath.endsWith("/") ? 
controllerPath.substring(0, controllerPath.length() - 1)
+          : controllerPath;
+    }
+    Properties properties = new Properties();
+    properties.setProperty(PinotAdminTransport.ADMIN_TRANSPORT_SCHEME, 
controllerUri.getScheme());
+
+    try (PinotAdminClient client = new PinotAdminClient(controllerAddress, 
properties)) {
       Schema schema = 
client.getSchemaClient().getSchemaObject("starbucksStores");
-      TableConfig tableConfig = 
client.getTableClient().getTableConfigObject("starbucksStores", "OFFLINE");
-      srcDs.addSink(new PinotSinkFunction<>(new 
FlinkRowGenericRowConverter(TEST_TYPE_INFO), tableConfig, schema));
-      execEnv.execute();
+      TableConfig tableConfig =
+          
client.getTableClient().getTableConfigObjectForType("starbucksStores", 
TableType.OFFLINE);
+      srcDs.addSink(
+          new PinotSinkFunction<>(new 
FlinkRowGenericRowConverter(TEST_TYPE_INFO), tableConfig, schema,
+              DEFAULT_CONTROLLER_URL));
     }
+    execEnv.execute();
   }
 }
diff --git 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
index e7204ef6b66..66ecfcd1ce7 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.connector.flink.sink;
 
 import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +35,10 @@ import 
org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.pinot.connector.flink.common.PinotGenericRowConverter;
 import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
 import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
 import org.slf4j.Logger;
@@ -49,6 +56,7 @@ public class PinotSinkFunction<T> extends RichSinkFunction<T> 
implements Checkpo
   public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000;
   public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
   public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
+  public static final String DEFAULT_OUTPUT_DIR_URI = "/tmp/pinotoutput";
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkFunction.class);
@@ -75,6 +83,16 @@ public class PinotSinkFunction<T> extends 
RichSinkFunction<T> implements Checkpo
     this(recordConverter, tableConfig, schema, 
DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
   }
 
+  /**
+   * Creates a sink using a table config fetched from the controller and 
injects the upload settings required by the
+   * Flink connector.
+   */
+  public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, 
TableConfig tableConfig, Schema schema,
+      String controllerBaseUrl) {
+    this(recordConverter, prepareTableConfigForSink(tableConfig, 
controllerBaseUrl), schema,
+        DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
+  }
+
   public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, 
TableConfig tableConfig, Schema schema,
       long segmentFlushMaxNumRecords, int executorPoolSize) {
     this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords, 
executorPoolSize, null, null);
@@ -92,6 +110,45 @@ public class PinotSinkFunction<T> extends 
RichSinkFunction<T> implements Checkpo
     _segmentUploadTimeMs = segmentUploadTimeMs;
   }
 
+  /**
+   * Applies the connector-specific batch-ingestion defaults needed for 
segment generation and upload.
+   */
+  static TableConfig prepareTableConfigForSink(TableConfig tableConfig, String 
controllerBaseUrl) {
+    Map<String, String> requiredBatchConfig = new HashMap<>();
+    requiredBatchConfig.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
controllerBaseUrl);
+    requiredBatchConfig.put(BatchConfigProperties.OUTPUT_DIR_URI, 
DEFAULT_OUTPUT_DIR_URI);
+
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    if (ingestionConfig == null) {
+      ingestionConfig = new IngestionConfig();
+      ingestionConfig.setBatchIngestionConfig(
+          new 
BatchIngestionConfig(Collections.singletonList(requiredBatchConfig), "APPEND", 
"HOURLY"));
+      tableConfig.setIngestionConfig(ingestionConfig);
+      return tableConfig;
+    }
+
+    BatchIngestionConfig batchIngestionConfig = 
ingestionConfig.getBatchIngestionConfig();
+    if (batchIngestionConfig == null) {
+      ingestionConfig.setBatchIngestionConfig(
+          new 
BatchIngestionConfig(Collections.singletonList(requiredBatchConfig), "APPEND", 
"HOURLY"));
+      return tableConfig;
+    }
+
+    List<Map<String, String>> batchConfigMaps = 
batchIngestionConfig.getBatchConfigMaps();
+    if (batchConfigMaps == null || batchConfigMaps.isEmpty()) {
+      
batchIngestionConfig.setBatchConfigMaps(Collections.singletonList(requiredBatchConfig));
+    } else if (batchConfigMaps.size() > 1) {
+      throw new IllegalStateException(String.format(
+          "Flink connector requires exactly 1 batchConfigMap for table %s, got 
%d", tableConfig.getTableName(),
+          batchConfigMaps.size()));
+    } else {
+      Map<String, String> mergedBatchConfig = new 
HashMap<>(batchConfigMaps.get(0));
+      mergedBatchConfig.putAll(requiredBatchConfig);
+      
batchIngestionConfig.setBatchConfigMaps(Collections.singletonList(mergedBatchConfig));
+    }
+    return tableConfig;
+  }
+
   @Override
   public void open(Configuration parameters)
       throws Exception {
diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkFunctionTest.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkFunctionTest.java
new file mode 100644
index 00000000000..9fcaea26e68
--- /dev/null
+++ 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkFunctionTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class PinotSinkFunctionTest {
+  @Test
+  public void testPrepareTableConfigForSinkAddsControllerBatchConfig() {
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
+
+    TableConfig result = 
PinotSinkFunction.prepareTableConfigForSink(tableConfig, 
"http://localhost:9000";);
+
+    assertNotNull(result.getIngestionConfig());
+    BatchIngestionConfig batchIngestionConfig = 
result.getIngestionConfig().getBatchIngestionConfig();
+    assertNotNull(batchIngestionConfig);
+    Map<String, String> batchConfigMap = 
batchIngestionConfig.getBatchConfigMaps().get(0);
+    
assertEquals(batchConfigMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI), 
"http://localhost:9000";);
+    assertEquals(batchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI),
+        PinotSinkFunction.DEFAULT_OUTPUT_DIR_URI);
+  }
+
+  @Test
+  public void 
testPrepareTableConfigForSinkMergesOverridesIntoExistingBatchConfig() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig
+        .setBatchIngestionConfig(new 
BatchIngestionConfig(List.of(Map.of("existing", "value")), "APPEND", "DAILY"));
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
+        .setIngestionConfig(ingestionConfig)
+        .build();
+
+    TableConfig result = 
PinotSinkFunction.prepareTableConfigForSink(tableConfig, 
"http://localhost:9000";);
+
+    List<Map<String, String>> batchConfigMaps =
+        
result.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps();
+    assertEquals(batchConfigMaps.size(), 1);
+    Map<String, String> mergedBatchConfigMap = batchConfigMaps.get(0);
+    assertEquals(mergedBatchConfigMap.get("existing"), "value");
+    
assertEquals(mergedBatchConfigMap.get(BatchConfigProperties.PUSH_CONTROLLER_URI),
 "http://localhost:9000";);
+    
assertEquals(mergedBatchConfigMap.get(BatchConfigProperties.OUTPUT_DIR_URI),
+        PinotSinkFunction.DEFAULT_OUTPUT_DIR_URI);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = ".*requires exactly 1 
batchConfigMap.*")
+  public void testPrepareTableConfigForSinkRejectsMultipleBatchConfigs() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(List.of(Map.of("first", "value"), 
Map.of("second", "value")), "APPEND", "DAILY"));
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable")
+        .setIngestionConfig(ingestionConfig)
+        .build();
+
+    PinotSinkFunction.prepareTableConfigForSink(tableConfig, 
"http://localhost:9000";);
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 9d86f827a03..cb29afe3d20 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -41,6 +41,7 @@ import org.apache.pinot.client.admin.ZookeeperAdminClient;
 import org.apache.pinot.common.restlet.resources.RebalanceResult;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
@@ -1204,13 +1205,15 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
   public void testTableTasksValidationWithDanglingTasks()
       throws Exception {
     String tableName = "testTableTasksValidationWithDangling";
+    String tableNameWithType = tableName + "_OFFLINE";
+    String taskType = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
     DEFAULT_INSTANCE.addDummySchema(tableName);
 
     TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
         .setTaskConfig(new TableTaskConfig(Map.of(
-            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            taskType,
             Map.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
-                CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+                CommonConstants.TABLE_NAME, tableNameWithType))))
         .build();
 
     // First create the table successfully
@@ -1219,27 +1222,27 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     // Create a task manually to simulate dangling task
     PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
     TaskSchedulingContext context = new TaskSchedulingContext();
-    context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+    context.setTablesToSchedule(Set.of(tableNameWithType));
     Map<String, TaskSchedulingInfo> taskInfo = 
taskManager.scheduleTasks(context);
     String taskName = 
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
-    waitForTaskState(taskName, TaskState.IN_PROGRESS);
-    boolean taskKnown = fetchTaskState(taskName) != null;
+    waitForTaskToBecomeActiveForCleanup(taskType, tableNameWithType, taskName);
 
     // Now try to create another table with same name (simulating re-creation 
with dangling tasks)
     deleteTable(tableName, null, true);
 
     try {
       createTable(offlineTableConfig.toJsonString());
-      if (taskKnown) {
-        fail("Table creation should fail when dangling tasks exist");
-      }
+      fail("Table creation should fail when dangling tasks exist");
     } catch (IOException e) {
-      if (taskKnown) {
-        assertTrue(e.getMessage().contains("The table has dangling task 
data"));
-      }
+      assertTrue(e.getMessage().contains("The table has dangling task data"));
     }
 
     // Clean up any remaining tasks
+    try {
+      taskResourceManager().deleteTask(taskName, true);
+    } catch (Exception ignored) {
+      // Ignore if task no longer exists
+    }
     try {
       deleteTable(tableName, null, true);
     } catch (Exception ignored) {
@@ -1268,13 +1271,15 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
   public void testTableTasksCleanupWithNonActiveTasks()
       throws Exception {
     String tableName = "testTableTasksCleanup";
+    String tableNameWithType = tableName + "_OFFLINE";
+    String taskType = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
     DEFAULT_INSTANCE.addDummySchema(tableName);
 
     TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
         .setTaskConfig(new TableTaskConfig(Map.of(
-            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            taskType,
             Map.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
-                CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+                CommonConstants.TABLE_NAME, tableNameWithType))))
         .build();
 
     // Create table
@@ -1283,65 +1288,82 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     // Create some completed tasks
     PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
     TaskSchedulingContext context = new TaskSchedulingContext();
-    context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
-    Map<String, TaskSchedulingInfo> taskInfo = 
taskManager.scheduleTasks(context);
-    String taskName = 
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
-    waitForTaskState(taskName, TaskState.IN_PROGRESS);
-    boolean taskKnown = fetchTaskState(taskName) != null;
-
-    if (taskKnown) {
-      // stop the task queue to abort the task
-      
taskClient().stopTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
-      waitForTaskState(taskName, TaskState.STOPPED);
-      // resume the task queue again to avoid affecting other tests
-      
taskClient().resumeTaskQueue(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+    context.setTablesToSchedule(Set.of(tableNameWithType));
+    taskManager.scheduleTasks(context);
+    taskResourceManager().stopTaskQueue(taskType);
+    try {
+      waitForTaskQueueState(taskType, TaskState.STOPPED);
+
+      // Delete table - should succeed and clean up tasks once the task stop 
is visible to cleanup.
+      String deleteResponse = deleteTableWhenNoActiveTasksRemain(tableName);
+      assertEquals(deleteResponse, "{\"status\":\"Tables: [" + 
tableNameWithType + "] deleted\"}");
+    } finally {
+      taskResourceManager().resumeTaskQueue(taskType);
     }
+  }
 
-    // Delete table - should succeed and clean up tasks
-    String deleteResponse = deleteTable(tableName);
-    assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + 
"_OFFLINE] deleted\"}");
+  private PinotHelixTaskResourceManager taskResourceManager() {
+    return 
DEFAULT_INSTANCE.getControllerStarter().getHelixTaskResourceManager();
+  }
+
+  private void waitForTaskQueueState(String taskType, TaskState expectedState) 
{
+    TestUtils.waitForCondition((aVoid) -> 
taskResourceManager().getTaskQueueState(taskType) == expectedState, 60_000,
+        "Task queue not in expected state " + expectedState);
   }
 
-  private void waitForTaskState(String taskName, TaskState expectedState) {
+  private void waitForTaskToBecomeActiveForCleanup(String taskType, String 
tableNameWithType, String taskName) {
     TestUtils.waitForCondition((aVoid) -> {
       try {
-        TaskState state = taskClient().getTaskState(taskName);
-        if (state == null) {
-          // If we cannot fetch state, treat IN_PROGRESS waits as best-effort 
no-ops.
-          return expectedState == TaskState.IN_PROGRESS;
-        }
-        // In test environments without a running minion, Helix can keep tasks 
in NOT_STARTED even though they
-        // are enqueued. Accept NOT_STARTED when we're only validating the 
presence of an in-flight task.
-        if (expectedState == TaskState.IN_PROGRESS && state == 
TaskState.NOT_STARTED) {
-          return true;
-        }
-        return state == expectedState;
+        return isTaskActiveForCleanup(taskType, tableNameWithType, taskName);
       } catch (Exception e) {
-        // If state lookup fails, consider IN_PROGRESS satisfied (task was 
enqueued) but surface other states.
-        return expectedState == TaskState.IN_PROGRESS;
+        return false;
       }
-    }, 60_000, "Task not scheduled to expected state " + expectedState);
+    }, 60_000, "Task not active for table cleanup: " + taskName);
   }
 
-  private TaskState fetchTaskState(String taskName) {
-    try {
-      return taskClient().getTaskState(taskName);
-    } catch (Exception e) {
-      return null;
+  private boolean isTaskActiveForCleanup(String taskType, String 
tableNameWithType, String taskName) {
+    TaskState taskState = taskResourceManager().getTaskStatesByTable(taskType, 
tableNameWithType).get(taskName);
+    return taskState == TaskState.IN_PROGRESS && 
taskResourceManager().getTaskCount(taskName).getRunning() > 0;
+  }
+
+  private String deleteTableWhenNoActiveTasksRemain(String tableName)
+      throws IOException {
+    long deadlineMs = System.currentTimeMillis() + 60_000L;
+    IOException lastActiveTaskError = null;
+    while (System.currentTimeMillis() < deadlineMs) {
+      try {
+        return deleteTable(tableName);
+      } catch (IOException e) {
+        String message = e.getMessage();
+        if (message == null || !message.contains("active running tasks")) {
+          throw e;
+        }
+        lastActiveTaskError = e;
+        try {
+          Thread.sleep(100L);
+        } catch (InterruptedException interruptedException) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while waiting for task cleanup to 
finish", interruptedException);
+        }
+      }
     }
+    throw lastActiveTaskError != null ? lastActiveTaskError
+        : new IOException("Timed out waiting for table deletion to succeed");
   }
 
   @Test
   public void testTableTasksCleanupWithActiveTasks()
       throws Exception {
     String tableName = "testTableTasksCleanupActive";
+    String tableNameWithType = tableName + "_OFFLINE";
+    String taskType = MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE;
     DEFAULT_INSTANCE.addDummySchema(tableName);
 
     TableConfig offlineTableConfig = getOfflineTableBuilder(tableName)
         .setTaskConfig(new TableTaskConfig(Map.of(
-            MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            taskType,
             Map.of(PinotTaskManager.SCHEDULE_KEY, "0 */10 * ? * * *",
-                CommonConstants.TABLE_NAME, tableName + "_OFFLINE"))))
+                CommonConstants.TABLE_NAME, tableNameWithType))))
         .build();
 
     // Create table
@@ -1350,38 +1372,27 @@ public class PinotTableRestletResourceTest extends 
ControllerTest {
     // Create an active/in-progress task
     PinotTaskManager taskManager = 
DEFAULT_INSTANCE.getControllerStarter().getTaskManager();
     TaskSchedulingContext context = new TaskSchedulingContext();
-    context.setTablesToSchedule(Set.of(tableName + "_OFFLINE"));
+    context.setTablesToSchedule(Set.of(tableNameWithType));
     Map<String, TaskSchedulingInfo> taskInfo = 
taskManager.scheduleTasks(context);
     String taskName = 
taskInfo.values().iterator().next().getScheduledTaskNames().get(0);
-    waitForTaskState(taskName, TaskState.IN_PROGRESS);
-    boolean taskKnown = fetchTaskState(taskName) != null;
+    waitForTaskToBecomeActiveForCleanup(taskType, tableNameWithType, taskName);
     try {
       // Try to delete table without ignoring active tasks - should fail
       deleteTable(tableName);
-      if (taskKnown) {
-        fail("Table deletion should fail when active tasks exist");
-      }
+      fail("Table deletion should fail when active tasks exist");
     } catch (IOException e) {
-      if (taskKnown) {
-        assertTrue(e.getMessage().contains("The table has") && 
e.getMessage().contains("active running tasks"));
-      }
+      assertTrue(e.getMessage().contains("The table has") && 
e.getMessage().contains("active running tasks"));
     }
 
     // Delete table with ignoreActiveTasks flag - should succeed
-    try {
-      String deleteResponse = 
getOrCreateAdminClient().getTableClient().deleteTable(tableName, null, null, 
true);
-      assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableName + 
"_OFFLINE] deleted\"}");
-    } catch (Exception e) {
-      // Table might already be removed if task state was unknown; ignore 
missing table errors
-      String message = e.getMessage();
-      if (message == null || !message.contains("does not exist")) {
-        throw e;
-      }
-    }
+    String deleteResponse = 
getOrCreateAdminClient().getTableClient().deleteTable(tableName, null, null, 
true);
+    assertEquals(deleteResponse, "{\"status\":\"Tables: [" + tableNameWithType 
+ "] deleted\"}");
 
     // delete task
-    if (taskKnown) {
-      getOrCreateAdminClient().getTaskClient().deleteTask(taskName, true);
+    try {
+      taskResourceManager().deleteTask(taskName, true);
+    } catch (Exception ignored) {
+      // Ignore if task no longer exists
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to