This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 856095392e4 Add logical table support to TLS integration tests (#17683)
856095392e4 is described below
commit 856095392e4220479787bd2da3b2beb28b7140bd
Author: Krishan Goyal <[email protected]>
AuthorDate: Thu Feb 12 12:27:22 2026 +0530
Add logical table support to TLS integration tests (#17683)
* Add logical table to TlsIntegrationTest for TLS routing coverage
Add createLogicalTableConfig() and createLogicalTable() to
BaseClusterIntegrationTest so subclasses can create a logical table
backed by the existing OFFLINE and REALTIME physical tables.
Override createLogicalTable() in TlsIntegrationTest to pass AUTH_HEADER,
reusing the config object from the base class. Schema creation is handled
independently in setUp(). Add testLogicalTableTlsRouting() to verify
logical table queries work over both the Pinot TLS connection and the
external broker TLS endpoint.
https://claude.ai/code/session_01WJtj1jpKYqaVn8RkT2NRVL
* Fix test case
---------
Co-authored-by: Claude <[email protected]>
---
.../tests/BaseClusterIntegrationTest.java | 43 ++++++++++++++++
.../tests/QueryQuotaClusterIntegrationTest.java | 3 +-
.../integration/tests/TlsIntegrationTest.java | 57 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 1 deletion(-)
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ed23fda35cf..d65149beae7 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -63,12 +63,16 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
+import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeBoundaryConfig;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.LogicalTableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
@@ -84,6 +88,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
// Default settings
protected static final String DEFAULT_TABLE_NAME = "mytable";
+ protected static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable_logical";
protected static final String DEFAULT_SCHEMA_NAME = "mytable";
protected static final String DEFAULT_SCHEMA_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
@@ -123,6 +128,10 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
return DEFAULT_TABLE_NAME;
}
+ protected String getLogicalTableName() {
+ return DEFAULT_LOGICAL_TABLE_NAME;
+ }
+
protected String getSchemaFileName() {
return DEFAULT_SCHEMA_FILE_NAME;
}
@@ -397,6 +406,40 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
return getTableConfigBuilder(TableType.REALTIME).build();
}
+ /**
+ * Creates a LogicalTableConfig backed by the OFFLINE and REALTIME physical
tables.
+ */
+ protected LogicalTableConfig createLogicalTableConfig() {
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+
+ Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
+ physicalTableConfigMap.put(offlineTableName, new PhysicalTableConfig());
+ physicalTableConfigMap.put(realtimeTableName, new PhysicalTableConfig());
+
+ return new LogicalTableConfigBuilder()
+ .setTableName(getLogicalTableName())
+ .setBrokerTenant(getBrokerTenant())
+ .setRefOfflineTableName(offlineTableName)
+ .setRefRealtimeTableName(realtimeTableName)
+ .setPhysicalTableConfigMap(physicalTableConfigMap)
+ .setTimeBoundaryConfig(
+ new TimeBoundaryConfig("min", Map.of("includedTables",
physicalTableConfigMap.keySet())))
+ .build();
+ }
+
+ /**
+ * Registers a logical table backed by the OFFLINE and REALTIME physical
tables.
+ * The schema for the logical table should be created separately before
calling this method.
+ */
+ protected void createLogicalTable()
+ throws IOException {
+ LogicalTableConfig logicalTableConfig = createLogicalTableConfig();
+ sendPostRequest(
+ _controllerRequestURLBuilder.forLogicalTableCreate(),
+ logicalTableConfig.toSingleLineJsonString());
+ }
+
// TODO - Use this method to create table config for all table types to
avoid redundant code
protected TableConfigBuilder getTableConfigBuilder(TableType tableType) {
return new TableConfigBuilder(tableType)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
index f91a4a9148c..3564961a852 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryQuotaClusterIntegrationTest.java
@@ -526,7 +526,8 @@ public class QueryQuotaClusterIntegrationTest extends
BaseClusterIntegrationTest
// to allow change propagation to QueryQuotaManager
}
- private static String getLogicalTableName() {
+ @Override
+ protected String getLogicalTableName() {
return "logical_table";
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index daea554d158..f104fa3041e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -63,6 +63,7 @@ import org.apache.pinot.core.common.MinionConstants;
import
org.apache.pinot.integration.tests.access.CertBasedTlsChannelAccessControlFactory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -116,6 +117,12 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
addTableConfig(createOfflineTableConfig());
+ // Create a logical table backed by the physical tables
+ Schema logicalTableSchema = createSchema(getSchemaFileName());
+ logicalTableSchema.setSchemaName(getLogicalTableName());
+ addSchema(logicalTableSchema);
+ createLogicalTable();
+
// Push data into Kafka
pushAvroIntoKafka(avroFiles);
waitForAllDocsLoaded(600_000L);
@@ -124,6 +131,8 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
@AfterClass(alwaysRun = true)
public void tearDown()
throws Exception {
+ dropLogicalTable(getLogicalTableName());
+ dropOfflineTable(getTableName());
dropRealtimeTable(getTableName());
stopMinion();
stopServer();
@@ -269,6 +278,16 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(),
tableConfig.toJsonString(), AUTH_HEADER);
}
+ @Override
+ protected void createLogicalTable()
+ throws IOException {
+ LogicalTableConfig logicalTableConfig = createLogicalTableConfig();
+ sendPostRequest(
+ _controllerRequestURLBuilder.forLogicalTableCreate(),
+ logicalTableConfig.toSingleLineJsonString(),
+ AUTH_HEADER);
+ }
+
@Override
protected Connection getPinotConnection() {
if (_pinotConnection == null) {
@@ -283,6 +302,20 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
return _pinotConnection;
}
+ @Override
+ public void dropLogicalTable(String logicalTableName)
+ throws IOException {
+
sendDeleteRequest(_controllerRequestURLBuilder.forLogicalTableDelete(logicalTableName),
AUTH_HEADER);
+ }
+
+ @Override
+ public void dropOfflineTable(String tableName)
+ throws IOException {
+ sendDeleteRequest(
+
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.OFFLINE.tableNameWithType(tableName)),
+ AUTH_HEADER);
+ }
+
@Override
public void dropRealtimeTable(String tableName)
throws IOException {
@@ -505,6 +538,30 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
}
}
+ @Test
+ public void testLogicalTableTlsRouting()
+ throws Exception {
+ String query = "SELECT count(*) FROM " + getLogicalTableName();
+
+ // Query via Pinot connection (TLS-enabled)
+ ResultSetGroup resultSetGroup = getPinotConnection().execute(query);
+ Assert.assertTrue(resultSetGroup.getResultSet(0).getLong(0) > 0);
+
+ // Query via external broker TLS endpoint
+ try (CloseableHttpClient client = makeClient(JKS, TLS_STORE_EMPTY_JKS,
TLS_STORE_JKS)) {
+ HttpPost request = new HttpPost("https://localhost:" +
_externalBrokerPort + "/query/sql");
+ request.addHeader(CLIENT_HEADER);
+ request.setEntity(
+ new StringEntity("{\"sql\":\"SELECT count(*) FROM " +
getLogicalTableName() + "\"}"));
+ try (CloseableHttpResponse response = client.execute(request)) {
+ Assert.assertEquals(response.getCode(), 200);
+ JsonNode resultTable =
+
JsonUtils.inputStreamToJsonNode(response.getEntity().getContent()).get("resultTable");
+ Assert.assertTrue(resultTable.get("rows").get(0).get(0).longValue() >
0);
+ }
+ }
+ }
+
@Test
public void testJDBCClient()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]