This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a2cde564f6 List tables by taskType (#9049)
a2cde564f6 is described below
commit a2cde564f6821350854a1e78935354fed812d3b4
Author: Neha Pawar <[email protected]>
AuthorDate: Thu Jul 14 11:11:30 2022 -0700
List tables by taskType (#9049)
* List tables by taskType
* Fix test states
* Regression check older java
---
.../api/resources/PinotTableRestletResource.java | 57 +++++++----
.../api/PinotTableRestletResourceTest.java | 112 +++++++++++++++++++++
2 files changed, 148 insertions(+), 21 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index dfd3a11ac7..b8239aa532 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -36,11 +36,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
@@ -60,6 +63,7 @@ import javax.ws.rs.core.Response;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
@@ -243,45 +247,55 @@ public class PinotTableRestletResource {
@Path("/tables")
@ApiOperation(value = "Lists all tables in cluster", notes = "Lists all
tables in cluster")
public String listTables(@ApiParam(value = "realtime|offline")
@QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
@ApiParam(value = "name|creationTime|lastModifiedTime")
@QueryParam("sortType") String sortTypeStr,
@ApiParam(value = "true|false") @QueryParam("sortAsc")
@DefaultValue("true") boolean sortAsc) {
try {
- List<String> tableNames;
TableType tableType = null;
if (tableTypeStr != null) {
tableType = TableType.valueOf(tableTypeStr.toUpperCase());
}
SortType sortType = sortTypeStr != null ?
SortType.valueOf(sortTypeStr.toUpperCase()) : SortType.NAME;
- if (tableType == null) {
- if (sortType == SortType.NAME) {
- tableNames = _pinotHelixResourceManager.getAllRawTables();
- } else {
- // NOTE: Need to read actual table names (with type suffix) when not
sorting on name because we need to read
- // the stats for the ZK records
- tableNames = _pinotHelixResourceManager.getAllTables();
- }
- } else {
- if (tableType == TableType.REALTIME) {
- tableNames = _pinotHelixResourceManager.getAllRealtimeTables();
- } else {
- tableNames = _pinotHelixResourceManager.getAllOfflineTables();
+ List<String> tableNamesWithType = tableType == null ?
_pinotHelixResourceManager.getAllTables()
+ : (tableType == TableType.REALTIME ?
_pinotHelixResourceManager.getAllRealtimeTables()
+ : _pinotHelixResourceManager.getAllOfflineTables());
+
+ if (StringUtils.isNotBlank(taskType)) {
+ Set<String> tableNamesForTaskType = new HashSet<>();
+ for (String tableNameWithType : tableNamesWithType) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null && tableConfig.getTaskConfig() != null &&
tableConfig.getTaskConfig()
+ .isTaskTypeEnabled(taskType)) {
+ tableNamesForTaskType.add(tableNameWithType);
+ }
}
+ tableNamesWithType.retainAll(tableNamesForTaskType);
}
+ List<String> tableNames;
if (sortType == SortType.NAME) {
- tableNames.sort(sortAsc ? null : Comparator.reverseOrder());
+ if (tableType == null && StringUtils.isBlank(taskType)) {
+ List<String> rawTableNames =
tableNamesWithType.stream().map(TableNameBuilder::extractRawTableName).distinct()
+ .collect(Collectors.toList());
+ rawTableNames.sort(sortAsc ? null : Comparator.reverseOrder());
+ tableNames = rawTableNames;
+ } else {
+ tableNamesWithType.sort(sortAsc ? null : Comparator.reverseOrder());
+ tableNames = tableNamesWithType;
+ }
} else {
int sortFactor = sortAsc ? 1 : -1;
ZkHelixPropertyStore<ZNRecord> propertyStore =
_pinotHelixResourceManager.getPropertyStore();
- int numTables = tableNames.size();
+ int numTables = tableNamesWithType.size();
List<String> zkPaths = new ArrayList<>(numTables);
- for (String tableNameWithType : tableNames) {
+ for (String tableNameWithType : tableNamesWithType) {
zkPaths.add(ZKMetadataProvider.constructPropertyStorePathForResourceConfig(tableNameWithType));
}
Stat[] stats = propertyStore.getStats(zkPaths,
AccessOption.PERSISTENT);
for (int i = 0; i < numTables; i++) {
- Preconditions.checkState(stats[i] != null, "Failed to read ZK stats
for table: %s", tableNames.get(i));
+ Preconditions.checkState(stats[i] != null, "Failed to read ZK stats
for table: %s",
+ tableNamesWithType.get(i));
}
IntComparator comparator;
if (sortType == SortType.CREATIONTIME) {
@@ -295,11 +309,12 @@ public class PinotTableRestletResource {
stats[i] = stats[j];
stats[j] = tempStat;
- String tempTableName = tableNames.get(i);
- tableNames.set(i, tableNames.get(j));
- tableNames.set(j, tempTableName);
+ String tempTableName = tableNamesWithType.get(i);
+ tableNamesWithType.set(i, tableNamesWithType.get(j));
+ tableNamesWithType.set(j, tempTableName);
};
Arrays.quickSort(0, numTables, comparator, swapper);
+ tableNames = tableNamesWithType;
}
return JsonUtils.newObjectNode().set("tables",
JsonUtils.objectToJsonNode(tableNames)).toString();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
index 5f98e864a6..5345c935c1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java
@@ -18,12 +18,16 @@
*/
package org.apache.pinot.controller.api;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.controller.api.resources.TableAndSchemaConfig;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -354,6 +358,114 @@ public class PinotTableRestletResourceTest {
}
}
+ private void deleteAllTables()
+ throws IOException {
+ List<String> tables = getTableNames(_createTableUrl + "?type=offline");
+ tables.addAll(getTableNames(_createTableUrl + "?type=realtime"));
+ for (String tableName : tables) {
+
ControllerTest.sendDeleteRequest(TEST_INSTANCE.getControllerRequestURLBuilder().forTableDelete(tableName));
+ }
+ }
+
+ @Test
+ public void testListTables()
+ throws Exception {
+ deleteAllTables();
+ List<String> tables = getTableNames(_createTableUrl);
+ Assert.assertTrue(tables.isEmpty());
+
+ // post 2 offline, 1 realtime
+ String rawTableName1 = "pqr";
+ TableConfig offlineTableConfig1 =
_offlineBuilder.setTableName(rawTableName1).build();
+ ControllerTest.sendPostRequest(_createTableUrl,
offlineTableConfig1.toJsonString());
+ TEST_INSTANCE.addDummySchema(rawTableName1);
+ TableConfig realtimeTableConfig1 =
_realtimeBuilder.setTableName(rawTableName1).setNumReplicas(2).build();
+ ControllerTest.sendPostRequest(_createTableUrl,
realtimeTableConfig1.toJsonString());
+ String rawTableName2 = "abc";
+ TableConfig offlineTableConfig2 =
_offlineBuilder.setTableName(rawTableName2).build();
+ ControllerTest.sendPostRequest(_createTableUrl,
offlineTableConfig2.toJsonString());
+
+ // list
+ tables = getTableNames(_createTableUrl);
+ Assert.assertEquals(tables, Lists.newArrayList("abc", "pqr"));
+ tables = getTableNames(_createTableUrl + "?sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr", "abc"));
+ tables = getTableNames(_createTableUrl + "?sortType=creationTime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"pqr_REALTIME", "abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?sortType=creationTime&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE",
"pqr_REALTIME", "pqr_OFFLINE"));
+ tables = getTableNames(_createTableUrl + "?sortType=lastModifiedTime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"pqr_REALTIME", "abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?sortType=lastModifiedTime&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE",
"pqr_REALTIME", "pqr_OFFLINE"));
+
+ // type
+ tables = getTableNames(_createTableUrl + "?type=realtime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_REALTIME"));
+ tables = getTableNames(_createTableUrl + "?type=offline");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE",
"pqr_OFFLINE"));
+ tables = getTableNames(_createTableUrl + "?type=offline&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?type=offline&sortType=creationTime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?type=offline&sortType=creationTime&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE",
"pqr_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?type=offline&sortType=lastModifiedTime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?type=offline&sortType=lastModifiedTime&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE",
"pqr_OFFLINE"));
+
+ // update taskType for abc_OFFLINE
+ Map<String, Map<String, String>> taskTypeMap = new HashMap<>();
+ taskTypeMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, new
HashMap<>());
+ offlineTableConfig2.setTaskConfig(new TableTaskConfig(taskTypeMap));
+ JsonUtils.stringToJsonNode(ControllerTest.sendPutRequest(
+
TEST_INSTANCE.getControllerRequestURLBuilder().forUpdateTableConfig(rawTableName2),
+ offlineTableConfig2.toJsonString()));
+ // update for pqr_REALTIME
+ taskTypeMap = new HashMap<>();
+ taskTypeMap.put(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
new HashMap<>());
+ realtimeTableConfig1.setTaskConfig(new TableTaskConfig(taskTypeMap));
+ JsonUtils.stringToJsonNode(ControllerTest.sendPutRequest(
+
TEST_INSTANCE.getControllerRequestURLBuilder().forUpdateTableConfig(rawTableName1),
+ realtimeTableConfig1.toJsonString()));
+
+ // list lastModified, taskType
+ tables = getTableNames(_createTableUrl + "?sortType=lastModifiedTime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"abc_OFFLINE", "pqr_REALTIME"));
+ tables = getTableNames(_createTableUrl +
"?sortType=lastModifiedTime&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_REALTIME",
"abc_OFFLINE", "pqr_OFFLINE"));
+ tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?taskType=MergeRollupTask&type=realtime");
+ Assert.assertTrue(tables.isEmpty());
+ tables = getTableNames(_createTableUrl +
"?taskType=RealtimeToOfflineSegmentsTask");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_REALTIME"));
+
+ // update taskType for pqr_OFFLINE
+ taskTypeMap = new HashMap<>();
+ taskTypeMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, new
HashMap<>());
+ offlineTableConfig1.setTaskConfig(new TableTaskConfig(taskTypeMap));
+ JsonUtils.stringToJsonNode(ControllerTest.sendPutRequest(
+
TEST_INSTANCE.getControllerRequestURLBuilder().forUpdateTableConfig(rawTableName1),
+ offlineTableConfig1.toJsonString()));
+
+ // list lastModified, taskType
+ tables = getTableNames(_createTableUrl + "?taskType=MergeRollupTask");
+ Assert.assertEquals(tables, Lists.newArrayList("abc_OFFLINE",
"pqr_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?taskType=MergeRollupTask&sortAsc=false");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"abc_OFFLINE"));
+ tables = getTableNames(_createTableUrl +
"?taskType=MergeRollupTask&sortType=creationTime");
+ Assert.assertEquals(tables, Lists.newArrayList("pqr_OFFLINE",
"abc_OFFLINE"));
+ }
+
+ private List<String> getTableNames(String url)
+ throws IOException {
+ JsonNode tablesJson =
JsonUtils.stringToJsonNode(ControllerTest.sendGetRequest(url)).get("tables");
+ return JsonUtils.jsonNodeToObject(tablesJson, new
TypeReference<List<String>>() {
+ });
+ }
+
@Test(expectedExceptions = IOException.class)
public void rebalanceNonExistentTable()
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]