This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7b69d094be Refactoring the upsert compaction related code (#12275)
7b69d094be is described below
commit 7b69d094be7bc410d4bf8fe15e4c30233421e300
Author: Seunghyun Lee <[email protected]>
AuthorDate: Mon Jan 22 00:55:40 2024 -0800
Refactoring the upsert compaction related code (#12275)
* Refactoring the upsert compaction related code
1. Fix the issue with fetching validDocId metadata for table with
a large number of segments. (Added POST API with list of segments
to be part of the request body)
2. Added POST support for MultiHttpRequest to cover 1.
3. Added GET /tables/<tableName>/validDocIdMetadata API on the controller
for improving debuggability.
* Addressing comments
---
.../requesthandler/BaseBrokerRequestHandler.java | 4 +-
.../apache/pinot/common/http/MultiHttpRequest.java | 51 +++++--
.../restlet/resources/ValidDocIdMetadataInfo.java | 56 +++++++
.../pinot/common/http/MultiHttpRequestTest.java | 106 ++++++++++++--
.../api/resources/PinotRunningQueryResource.java | 2 +-
.../api/resources/PinotTableRestletResource.java | 37 +++++
.../controller/util/CompletionServiceHelper.java | 52 ++++++-
.../util/ServerSegmentMetadataReader.java | 161 ++++++++++++++++++---
.../pinot/controller/util/TableMetadataReader.java | 30 +++-
.../pinot/plugin/minion/tasks/MinionTaskUtils.java | 41 ++++++
.../UpsertCompactionTaskExecutor.java | 141 +-----------------
.../UpsertCompactionTaskGenerator.java | 106 ++++----------
.../UpsertCompactionTaskExecutorTest.java | 7 +-
.../UpsertCompactionTaskGeneratorTest.java | 86 +++--------
.../readers/CompactedPinotSegmentRecordReader.java | 106 ++++++++++++++
.../pinot/server/api/resources/TablesResource.java | 57 ++++++--
.../pinot/server/api/TablesResourceTest.java | 22 +++
17 files changed, 715 insertions(+), 350 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index b2da166510..cc640110fc 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -209,9 +209,9 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
// TODO: Use different global query id for OFFLINE and REALTIME table
after releasing 0.12.0. See QueryIdUtils for
// details
String globalQueryId = getGlobalQueryId(requestId);
- List<String> serverUrls = new ArrayList<>();
+ List<Pair<String, String>> serverUrls = new ArrayList<>();
for (ServerInstance serverInstance : queryServers._servers) {
- serverUrls.add(String.format("%s/query/%s",
serverInstance.getAdminEndpoint(), globalQueryId));
+ serverUrls.add(Pair.of(String.format("%s/query/%s",
serverInstance.getAdminEndpoint(), globalQueryId), null));
}
LOGGER.debug("Cancelling the query: {} via server urls: {}",
queryServers._query, serverUrls);
CompletionService<MultiHttpRequestResponse> completionService =
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
index a28674e2ad..73efb2c598 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.common.http;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
@@ -26,11 +27,14 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.function.Function;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
@@ -66,14 +70,29 @@ public class MultiHttpRequest {
* @return instance of CompletionService. Completion service will provide
* results as they arrive. The order is NOT same as the order of URLs
*/
- public CompletionService<MultiHttpRequestResponse> execute(List<String> urls,
+ public CompletionService<MultiHttpRequestResponse> executeGet(List<String>
urls,
@Nullable Map<String, String> requestHeaders, int timeoutMs) {
- return execute(urls, requestHeaders, timeoutMs, "GET", HttpGet::new);
+ List<Pair<String, String>> urlsAndRequestBodies = new ArrayList<>();
+ urls.forEach(url -> urlsAndRequestBodies.add(Pair.of(url, "")));
+ return execute(urlsAndRequestBodies, requestHeaders, timeoutMs, "GET",
HttpGet::new);
+ }
+
+ /**
+ * POST urls in parallel using the executor service.
+ * @param urlsAndRequestBodies absolute URLs to POST
+ * @param requestHeaders headers to set when making the request
+ * @param timeoutMs timeout in milliseconds for each POST request
+ * @return instance of CompletionService. Completion service will provide
+ * results as they arrive. The order is NOT same as the order of URLs
+ */
+ public CompletionService<MultiHttpRequestResponse>
executePost(List<Pair<String, String>> urlsAndRequestBodies,
+ @Nullable Map<String, String> requestHeaders, int timeoutMs) {
+ return execute(urlsAndRequestBodies, requestHeaders, timeoutMs, "POST",
HttpPost::new);
}
/**
* Execute certain http method on the urls in parallel using the executor
service.
- * @param urls absolute URLs to execute the http method
+ * @param urlsAndRequestBodies absolute URLs to execute the http method
* @param requestHeaders headers to set when making the request
* @param timeoutMs timeout in milliseconds for each http request
* @param httpMethodName the name of the http method like GET, DELETE etc.
@@ -81,22 +100,28 @@ public class MultiHttpRequest {
* @return instance of CompletionService. Completion service will provide
* results as they arrive. The order is NOT same as the order of URLs
*/
- public <T extends HttpRequestBase>
CompletionService<MultiHttpRequestResponse> execute(List<String> urls,
- @Nullable Map<String, String> requestHeaders, int timeoutMs, String
httpMethodName,
- Function<String, T> httpRequestBaseSupplier) {
+ public <T extends HttpRequestBase>
CompletionService<MultiHttpRequestResponse> execute(
+ List<Pair<String, String>> urlsAndRequestBodies, @Nullable Map<String,
String> requestHeaders, int timeoutMs,
+ String httpMethodName, Function<String, T> httpRequestBaseSupplier) {
// Create global request configuration
- RequestConfig defaultRequestConfig = RequestConfig.custom()
- .setConnectionRequestTimeout(timeoutMs)
- .setSocketTimeout(timeoutMs).build(); // setting the socket
+ RequestConfig defaultRequestConfig =
+
RequestConfig.custom().setConnectionRequestTimeout(timeoutMs).setSocketTimeout(timeoutMs)
+ .build(); // setting the socket
- HttpClientBuilder httpClientBuilder = HttpClients.custom()
-
.setConnectionManager(_connectionManager).setDefaultRequestConfig(defaultRequestConfig);
+ HttpClientBuilder httpClientBuilder =
+
HttpClients.custom().setConnectionManager(_connectionManager).setDefaultRequestConfig(defaultRequestConfig);
CompletionService<MultiHttpRequestResponse> completionService = new
ExecutorCompletionService<>(_executor);
CloseableHttpClient client = httpClientBuilder.build();
- for (String url : urls) {
+ for (Pair<String, String> pair : urlsAndRequestBodies) {
completionService.submit(() -> {
- T httpMethod = httpRequestBaseSupplier.apply(url);
+ String url = pair.getLeft();
+ String body = pair.getRight();
+ HttpRequestBase httpMethod = httpRequestBaseSupplier.apply(url);
+ // If the http method is POST, set the request body
+ if (httpMethod instanceof HttpPost) {
+ ((HttpPost) httpMethod).setEntity(new StringEntity(body));
+ }
if (requestHeaders != null) {
requestHeaders.forEach(((HttpRequestBase) httpMethod)::setHeader);
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
new file mode 100644
index 0000000000..475ba91432
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ValidDocIdMetadataInfo.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ValidDocIdMetadataInfo {
+ private final String _segmentName;
+ private final long _totalValidDocs;
+ private final long _totalInvalidDocs;
+ private final long _totalDocs;
+
+ public ValidDocIdMetadataInfo(@JsonProperty("segmentName") String
segmentName,
+ @JsonProperty("totalValidDocs") long totalValidDocs,
@JsonProperty("totalInvalidDocs") long totalInvalidDocs,
+ @JsonProperty("totalDocs") long totalDocs) {
+ _segmentName = segmentName;
+ _totalValidDocs = totalValidDocs;
+ _totalInvalidDocs = totalInvalidDocs;
+ _totalDocs = totalDocs;
+ }
+
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ public long getTotalValidDocs() {
+ return _totalValidDocs;
+ }
+
+ public long getTotalInvalidDocs() {
+ return _totalInvalidDocs;
+ }
+
+ public long getTotalDocs() {
+ return _totalDocs;
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
index 61f58f78f6..292e2ec82e 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
@@ -43,6 +44,30 @@ import org.testng.annotations.Test;
public class MultiHttpRequestTest {
+ class TestResult {
+ private final int _success;
+ private final int _errors;
+ private final int _timeouts;
+
+ public TestResult(int success, int errors, int timeouts) {
+ _success = success;
+ _errors = errors;
+ _timeouts = timeouts;
+ }
+
+ public int getSuccess() {
+ return _success;
+ }
+
+ public int getErrors() {
+ return _errors;
+ }
+
+ public int getTimeouts() {
+ return _timeouts;
+ }
+ }
+
private static final Logger LOGGER =
LoggerFactory.getLogger(MultiHttpRequest.class);
private static final String SUCCESS_MSG = "success";
private static final String ERROR_MSG = "error";
@@ -61,6 +86,7 @@ public class MultiHttpRequestTest {
startServer(_portStart, createHandler(SUCCESS_CODE, SUCCESS_MSG, 0));
startServer(_portStart + 1, createHandler(ERROR_CODE, ERROR_MSG, 0));
startServer(_portStart + 2, createHandler(SUCCESS_CODE, TIMEOUT_MSG,
TIMEOUT_MS));
+ startServer(_portStart + 3, createPostHandler(SUCCESS_CODE, SUCCESS_MSG,
0));
}
@AfterTest
@@ -90,6 +116,33 @@ public class MultiHttpRequestTest {
};
}
+ private HttpHandler createPostHandler(final int status, final String msg,
final int sleepTimeMs) {
+ return new HttpHandler() {
+ @Override
+ public void handle(HttpExchange httpExchange)
+ throws IOException {
+ if (sleepTimeMs > 0) {
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException e) {
+ LOGGER.info("Handler interrupted during sleep");
+ }
+ }
+ if (httpExchange.getRequestMethod().equals("POST")) {
+ httpExchange.sendResponseHeaders(status, msg.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(msg.getBytes());
+ responseBody.close();
+ } else {
+ httpExchange.sendResponseHeaders(ERROR_CODE, ERROR_MSG.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(ERROR_MSG.getBytes());
+ responseBody.close();
+ }
+ }
+ };
+ }
+
private void startServer(int port, HttpHandler handler)
throws IOException {
final HttpServer server = HttpServer.create(new InetSocketAddress(port),
0);
@@ -104,22 +157,58 @@ public class MultiHttpRequestTest {
}
@Test
- public void testMultiGet() throws Exception {
- MultiHttpRequest mget =
- new MultiHttpRequest(Executors.newCachedThreadPool(), new
PoolingHttpClientConnectionManager());
+ public void testMultiGet() {
List<String> urls = Arrays.asList("http://localhost:" +
String.valueOf(_portStart) + URI_PATH,
"http://localhost:" + String.valueOf(_portStart + 1) + URI_PATH,
"http://localhost:" + String.valueOf(_portStart + 2) + URI_PATH,
// 2nd request to the same server
- "http://localhost:" + String.valueOf(_portStart) + URI_PATH);
+ "http://localhost:" + String.valueOf(_portStart) + URI_PATH,
+ "http://localhost:" + String.valueOf(_portStart + 3) + URI_PATH);
+
+ MultiHttpRequest mget =
+ new MultiHttpRequest(Executors.newCachedThreadPool(), new
PoolingHttpClientConnectionManager());
+
// timeout value needs to be less than 5000ms set above for
// third server
final int requestTimeoutMs = 1000;
- CompletionService<MultiHttpRequestResponse> completionService =
mget.execute(urls, null, requestTimeoutMs);
+ CompletionService<MultiHttpRequestResponse> completionService =
mget.executeGet(urls, null, requestTimeoutMs);
+
+ TestResult result = collectResult(completionService, urls.size());
+ Assert.assertEquals(result.getSuccess(), 2);
+ Assert.assertEquals(result.getErrors(), 2);
+ Assert.assertEquals(result.getTimeouts(), 1);
+ }
+
+ @Test
+ public void testMultiPost() {
+ List<Pair<String, String>> urlsAndRequestBodies =
+ List.of(Pair.of("http://localhost:" + String.valueOf(_portStart) +
URI_PATH, "b0"),
+ Pair.of("http://localhost:" + String.valueOf(_portStart + 1) +
URI_PATH, "b1"),
+ Pair.of("http://localhost:" + String.valueOf(_portStart + 2) +
URI_PATH, "b2"),
+ // 2nd request to the same server
+ Pair.of("http://localhost:" + String.valueOf(_portStart) +
URI_PATH, "b3"),
+ Pair.of("http://localhost:" + String.valueOf(_portStart + 3) +
URI_PATH, "b4"));
+
+ MultiHttpRequest mpost =
+ new MultiHttpRequest(Executors.newCachedThreadPool(), new
PoolingHttpClientConnectionManager());
+
+ // timeout value needs to be less than 5000ms set above for
+ // third server
+ final int requestTimeoutMs = 1000;
+ CompletionService<MultiHttpRequestResponse> completionService =
+ mpost.executePost(urlsAndRequestBodies, null, requestTimeoutMs);
+
+ TestResult result = collectResult(completionService,
urlsAndRequestBodies.size());
+ Assert.assertEquals(result.getSuccess(), 3);
+ Assert.assertEquals(result.getErrors(), 1);
+ Assert.assertEquals(result.getTimeouts(), 1);
+ }
+
+ private TestResult collectResult(CompletionService<MultiHttpRequestResponse>
completionService, int size) {
int success = 0;
int errors = 0;
int timeouts = 0;
- for (int i = 0; i < urls.size(); i++) {
+ for (int i = 0; i < size; i++) {
try (MultiHttpRequestResponse httpRequestResponse =
completionService.take().get()) {
if (httpRequestResponse.getResponse().getStatusLine().getStatusCode()
>= 300) {
errors++;
@@ -143,9 +232,6 @@ public class MultiHttpRequestTest {
errors++;
}
}
-
- Assert.assertEquals(success, 2);
- Assert.assertEquals(errors, 1);
- Assert.assertEquals(timeouts, 1);
+ return new TestResult(success, errors, timeouts);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
index 8aa004b35b..29bdccf4eb 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java
@@ -187,7 +187,7 @@ public class PinotRunningQueryResource {
}
LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
CompletionService<MultiHttpRequestResponse> completionService =
- new MultiHttpRequest(_executor, _httpConnMgr).execute(brokerUrls,
requestHeaders, timeoutMs);
+ new MultiHttpRequest(_executor, _httpConnMgr).executeGet(brokerUrls,
requestHeaders, timeoutMs);
Map<String, Map<String, String>> queriesByBroker = new HashMap<>();
List<String> errMsgs = new ArrayList<>(brokerUrls.size());
for (int i = 0; i < brokerUrls.size(); i++) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 5d35e1f0c3..d7894c08f7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -950,6 +950,43 @@ public class PinotTableRestletResource {
return segmentsMetadata;
}
+ @GET
+ @Path("tables/{tableName}/validDocIdMetadata")
+ @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_METADATA)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the aggregate valid doc id metadata of all
segments for a table", notes = "Get the "
+ + "aggregate valid doc id metadata of all segments for a table")
+ public String getTableAggregateValidDocIdMetadata(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "A list of segments", allowMultiple = true)
@QueryParam("segmentNames")
+ List<String> segmentNames) {
+ LOGGER.info("Received a request to fetch aggregate valid doc id metadata
for a table {}", tableName);
+ TableType tableType = Constants.validateTableType(tableTypeStr);
+ if (tableType == TableType.OFFLINE) {
+ throw new ControllerApplicationException(LOGGER, "Table type : " +
tableTypeStr + " not yet supported.",
+ Response.Status.NOT_IMPLEMENTED);
+ }
+ String tableNameWithType =
+
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
+
+ String validDocIdMetadata;
+ try {
+ TableMetadataReader tableMetadataReader =
+ new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
+ JsonNode segmentsMetadataJson =
+
tableMetadataReader.getAggregateValidDocIdMetadata(tableNameWithType,
segmentNames,
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ validDocIdMetadata =
JsonUtils.objectToPrettyString(segmentsMetadataJson);
+ } catch (InvalidConfigException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.BAD_REQUEST);
+ } catch (IOException ioe) {
+ throw new ControllerApplicationException(LOGGER, "Error parsing Pinot
server response: " + ioe.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, ioe);
+ }
+ return validDocIdMetadata;
+ }
+
@GET
@Path("tables/{tableName}/indexes")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.GET_METADATA)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index d26f5ba603..f0b36b2cfa 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.pinot.common.http.MultiHttpRequest;
@@ -79,12 +80,46 @@ public class CompletionServiceHelper {
public CompletionServiceResponse doMultiGetRequest(List<String> serverURLs,
String tableNameWithType,
boolean multiRequestPerServer, @Nullable Map<String, String>
requestHeaders, int timeoutMs,
@Nullable String useCase) {
- CompletionServiceResponse completionServiceResponse = new
CompletionServiceResponse();
-
// TODO: use some service other than completion service so that we know
which server encounters the error
CompletionService<MultiHttpRequestResponse> completionService =
- new MultiHttpRequest(_executor,
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
- for (int i = 0; i < serverURLs.size(); i++) {
+ new MultiHttpRequest(_executor,
_httpConnectionManager).executeGet(serverURLs, requestHeaders, timeoutMs);
+
+ return collectResponse(tableNameWithType, serverURLs.size(),
completionService, multiRequestPerServer, useCase);
+ }
+
+ /**
+ * This method makes a MultiPost call to all given URLs and its
corresponding bodies.
+ * @param serverURLsAndRequestBodies server urls to send GET request.
+ * @param tableNameWithType table name with type suffix
+ * @param multiRequestPerServer it's possible that need to send multiple
requests to a same server.
+ * If multiRequestPerServer is set as false,
return as long as one of the requests get
+ * response; If multiRequestPerServer is set as
true, wait until all requests
+ * get response.
+ * @param requestHeaders Headers to be set when making the http calls.
+ * @param timeoutMs timeout in milliseconds to wait per request.
+ * @param useCase the use case initiating the multi-get request. If not null
and an exception is thrown, only the
+ * error message and the use case are logged instead of the
full stack trace.
+ * @return CompletionServiceResponse Map of the endpoint(server instance, or
full request path if
+ * multiRequestPerServer is true) to the response from that endpoint.
+ */
+ public CompletionServiceResponse doMultiPostRequest(List<Pair<String,
String>> serverURLsAndRequestBodies,
+ String tableNameWithType, boolean multiRequestPerServer, @Nullable
Map<String, String> requestHeaders,
+ int timeoutMs, @Nullable String useCase) {
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(_executor,
_httpConnectionManager).executePost(serverURLsAndRequestBodies, requestHeaders,
+ timeoutMs);
+
+ return collectResponse(tableNameWithType,
serverURLsAndRequestBodies.size(), completionService,
+ multiRequestPerServer, useCase);
+ }
+
+ private CompletionServiceResponse collectResponse(String tableNameWithType,
int size,
+ CompletionService<MultiHttpRequestResponse> completionService, boolean
multiRequestPerServer,
+ @Nullable String useCase) {
+ CompletionServiceResponse completionServiceResponse = new
CompletionServiceResponse();
+
+ for (int i = 0; i < size; i++) {
MultiHttpRequestResponse multiHttpRequestResponse = null;
try {
multiHttpRequestResponse = completionService.take().get();
@@ -93,7 +128,8 @@ public class CompletionServiceHelper {
_endpointsToServers.get(String.format("%s://%s:%d",
uri.getScheme(), uri.getHost(), uri.getPort()));
int statusCode =
multiHttpRequestResponse.getResponse().getStatusLine().getStatusCode();
if (statusCode >= 300) {
- LOGGER.error("Server: {} returned error: {}", instance, statusCode);
+ String reason =
multiHttpRequestResponse.getResponse().getStatusLine().getReasonPhrase();
+ LOGGER.error("Server: {} returned error: {}, reason: {}", instance,
statusCode, reason);
completionServiceResponse._failedResponseCount++;
continue;
}
@@ -102,7 +138,7 @@ public class CompletionServiceHelper {
.put(multiRequestPerServer ? uri.toString() : instance,
responseString);
} catch (Exception e) {
String reason = useCase == null ? "" : String.format(" in '%s'",
useCase);
- LOGGER.error("Connection error{}. Details: {}", reason,
e.getMessage());
+ LOGGER.error("Connection error {}. Details: {}", reason,
e.getMessage());
completionServiceResponse._failedResponseCount++;
} finally {
if (multiHttpRequestResponse != null) {
@@ -116,9 +152,9 @@ public class CompletionServiceHelper {
}
int numServersResponded = completionServiceResponse._httpResponses.size();
- if (numServersResponded != serverURLs.size()) {
+ if (numServersResponded != size) {
LOGGER.warn("Finished reading information for table: {} with {}/{}
server responses", tableNameWithType,
- numServersResponded, serverURLs.size());
+ numServersResponded, size);
} else {
LOGGER.info("Finished reading information for table: {}",
tableNameWithType);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index a51881d9e3..de1cae193c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -18,19 +18,35 @@
*/
package org.apache.pinot.controller.util;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
+import org.apache.pinot.common.restlet.resources.TableSegments;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
+import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +63,11 @@ public class ServerSegmentMetadataReader {
private final Executor _executor;
private final HttpClientConnectionManager _connectionManager;
+ public ServerSegmentMetadataReader() {
+ _executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+ _connectionManager = new PoolingHttpClientConnectionManager();
+ }
+
public ServerSegmentMetadataReader(Executor executor,
HttpClientConnectionManager connectionManager) {
_executor = executor;
_connectionManager = connectionManager;
@@ -100,11 +121,11 @@ public class ServerSegmentMetadataReader {
tableMetadataInfo.getColumnCardinalityMap().forEach((k, v) ->
columnCardinalityMap.merge(k, v, Double::sum));
tableMetadataInfo.getMaxNumMultiValuesMap().forEach((k, v) ->
maxNumMultiValuesMap.merge(k, v, Double::sum));
tableMetadataInfo.getColumnIndexSizeMap().forEach((k, v) ->
columnIndexSizeMap.merge(k, v, (l, r) -> {
- for (Map.Entry<String, Double> e : r.entrySet()) {
- l.put(e.getKey(), l.getOrDefault(e.getKey(), 0d) + e.getValue());
- }
- return l;
- }));
+ for (Map.Entry<String, Double> e : r.entrySet()) {
+ l.put(e.getKey(), l.getOrDefault(e.getKey(), 0d) + e.getValue());
+ }
+ return l;
+ }));
} catch (IOException e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ",
streamResponse.getKey(), e);
@@ -180,27 +201,129 @@ public class ServerSegmentMetadataReader {
return segmentsMetadata;
}
+ /**
+ * This method is called when the API request is to fetch validDocId
metadata for a list segments of the given table.
+ * This method will pick a server that hosts the target segment and fetch
the segment metadata result.
+ *
+ * @return segment metadata as a JSON string
+ */
+ public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String
tableNameWithType,
+ Map<String, List<String>> serverToSegmentsMap, BiMap<String, String>
serverToEndpoints,
+ @Nullable List<String> segmentNames, int timeoutMs) {
+ List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+ for (Map.Entry<String, List<String>> serverToSegments :
serverToSegmentsMap.entrySet()) {
+ List<String> segmentsForServer = serverToSegments.getValue();
+ List<String> segmentsToQuery = new ArrayList<>();
+ if (segmentNames == null || segmentNames.isEmpty()) {
+ segmentsToQuery.addAll(segmentsForServer);
+ } else {
+ Set<String> segmentNamesLookUpTable = new HashSet<>(segmentNames);
+ for (String segment : segmentsForServer) {
+ if (segmentNamesLookUpTable.contains(segment)) {
+ segmentsToQuery.add(segment);
+ }
+ }
+ }
+ serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType,
segmentsToQuery,
+ serverToEndpoints.get(serverToSegments.getKey())));
+ }
+
+ // request the urls from the servers
+ CompletionServiceHelper completionServiceHelper =
+ new CompletionServiceHelper(_executor, _connectionManager,
serverToEndpoints);
+
+ Map<String, String> requestHeaders = Map.of("Content-Type",
"application/json");
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiPostRequest(serverURLsAndBodies,
tableNameWithType, false, requestHeaders,
+ timeoutMs, null);
+
+ List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>();
+ int failedParses = 0;
+ int returnedSegmentsCount = 0;
+ for (Map.Entry<String, String> streamResponse :
serviceResponse._httpResponses.entrySet()) {
+ try {
+ String validDocIdMetadataList = streamResponse.getValue();
+ List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
+ JsonUtils.stringToObject(validDocIdMetadataList, new
TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+ });
+ validDocIdMetadataInfos.addAll(validDocIdMetadataInfo);
+ returnedSegmentsCount++;
+ } catch (Exception e) {
+ failedParses++;
+ LOGGER.error("Unable to parse server {} response due to an error: ",
streamResponse.getKey(), e);
+ }
+ }
+ if (failedParses != 0) {
+ LOGGER.error("Unable to parse server {} / {} response due to an error:
", failedParses,
+ serverURLsAndBodies.size());
+ }
+
+ if (segmentNames != null && returnedSegmentsCount != segmentNames.size()) {
+ LOGGER.error("Unable to get validDocIdMetadata from all servers.
Expected: {}, Actual: {}", segmentNames.size(),
+ returnedSegmentsCount);
+ }
+ LOGGER.info("Retrieved valid doc id metadata for {} segments from {}
servers.", returnedSegmentsCount,
+ serverURLsAndBodies.size());
+ return validDocIdMetadataInfos;
+ }
+
+ /**
+ * This method is called when the API request is to fetch validDocIds for a
segment of the given table. This method
+ * will pick a server that hosts the target segment and fetch the
validDocIds result.
+ *
+ * @return a bitmap of validDocIds
+ */
+ public RoaringBitmap getValidDocIdsFromServer(String tableNameWithType,
String segmentName, String endpoint,
+ int timeoutMs) {
+ // Build the endpoint url
+ String url = generateValidDocIdsURL(tableNameWithType, segmentName,
endpoint);
+
+ // Set timeout
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMs);
+ clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMs);
+
+ Response response =
ClientBuilder.newClient(clientConfig).target(url).request().get(Response.class);
+ Preconditions.checkState(response.getStatus() ==
Response.Status.OK.getStatusCode(),
+ "Unable to retrieve validDocIds from %s", url);
+ byte[] validDocIds = response.readEntity(byte[].class);
+ return RoaringBitmapUtils.deserialize(validDocIds);
+ }
+
private String generateAggregateSegmentMetadataServerURL(String
tableNameWithType, List<String> columns,
String endpoint) {
- try {
- tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8.name());
- String paramsStr = generateColumnsParam(columns);
- return String.format("%s/tables/%s/metadata?%s", endpoint,
tableNameWithType, paramsStr);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e.getCause());
- }
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ String paramsStr = generateColumnsParam(columns);
+ return String.format("%s/tables/%s/metadata?%s", endpoint,
tableNameWithType, paramsStr);
}
private String generateSegmentMetadataServerURL(String tableNameWithType,
String segmentName, List<String> columns,
String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
+ String paramsStr = generateColumnsParam(columns);
+ return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint,
tableNameWithType, segmentName, paramsStr);
+ }
+
+ private String generateValidDocIdsURL(String tableNameWithType, String
segmentName, String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
+ return String.format("%s/segments/%s/%s/validDocIds", endpoint,
tableNameWithType, segmentName);
+ }
+
+ private Pair<String, String> generateValidDocIdMetadataURL(String
tableNameWithType, List<String> segmentNames,
+ String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ TableSegments tableSegments = new TableSegments(segmentNames);
+ String jsonTableSegments;
try {
- tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8.name());
- segmentName = URLEncoder.encode(segmentName,
StandardCharsets.UTF_8.name());
- String paramsStr = generateColumnsParam(columns);
- return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint,
tableNameWithType, segmentName, paramsStr);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e.getCause());
+ jsonTableSegments = JsonUtils.objectToString(tableSegments);
+ } catch (JsonProcessingException e) {
+ LOGGER.error("Failed to convert segment names to json request body:
segmentNames={}", segmentNames);
+ throw new RuntimeException(e);
}
+ return Pair.of(
+ String.format("%s/tables/%s/validDocIdMetadata", endpoint,
tableNameWithType), jsonTableSegments);
}
private String generateColumnsParam(List<String> columns) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 7ef5302c1b..f92747b497 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -81,8 +82,9 @@ public class TableMetadataReader {
}
}
- List<String> segmentsMetadata = serverSegmentMetadataReader
- .getSegmentMetadataFromServer(tableNameWithType, serverToSegmentsMap,
endpoints, columns, timeoutMs);
+ List<String> segmentsMetadata =
+
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType,
serverToSegmentsMap, endpoints,
+ columns, timeoutMs);
Map<String, JsonNode> response = new HashMap<>();
for (String segmentMetadata : segmentsMetadata) {
JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
@@ -146,8 +148,28 @@ public class TableMetadataReader {
ServerSegmentMetadataReader serverSegmentMetadataReader =
new ServerSegmentMetadataReader(_executor, _connectionManager);
- TableMetadataInfo aggregateTableMetadataInfo = serverSegmentMetadataReader
- .getAggregatedTableMetadataFromServer(tableNameWithType, endpoints,
columns, numReplica, timeoutMs);
+ TableMetadataInfo aggregateTableMetadataInfo =
+
serverSegmentMetadataReader.getAggregatedTableMetadataFromServer(tableNameWithType,
endpoints, columns,
+ numReplica, timeoutMs);
+ return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
+ }
+
+ /**
+ * This method retrieves the aggregated valid doc id metadata for a given
table.
+ * @return a list of ValidDocIdMetadataInfo
+ */
+ public JsonNode getAggregateValidDocIdMetadata(String tableNameWithType,
List<String> segmentNames, int timeoutMs)
+ throws InvalidConfigException {
+ final Map<String, List<String>> serverToSegments =
+ _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+ BiMap<String, String> endpoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+ ServerSegmentMetadataReader serverSegmentMetadataReader =
+ new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+ List<ValidDocIdMetadataInfo> aggregateTableMetadataInfo =
+
serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType,
serverToSegments, endpoints,
+ segmentNames, timeoutMs);
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index a46f903ed9..31f5d6039c 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -21,15 +21,23 @@ package org.apache.pinot.plugin.minion.tasks;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,4 +138,37 @@ public class MinionTaskUtils {
}
return dirInStr;
}
+
+ public static RoaringBitmap getValidDocIds(String tableNameWithType, String
segmentName, Map<String, String> configs,
+ MinionContext minionContext) {
+ HelixAdmin helixAdmin =
minionContext.getHelixManager().getClusterManagmentTool();
+ String clusterName = minionContext.getHelixManager().getClusterName();
+
+ String server = getServer(segmentName, tableNameWithType, helixAdmin,
clusterName);
+ InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName,
server);
+ String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
+
+ // We only need aggregated table size and the total number of docs/rows.
Skipping column related stats, by
+ // passing an empty list.
+ ServerSegmentMetadataReader serverSegmentMetadataReader = new
ServerSegmentMetadataReader();
+ return
serverSegmentMetadataReader.getValidDocIdsFromServer(tableNameWithType,
segmentName, endpoint, 60_000);
+ }
+
+ public static String getServer(String segmentName, String tableNameWithType,
HelixAdmin helixAdmin,
+ String clusterName) {
+ ExternalView externalView =
helixAdmin.getResourceExternalView(clusterName, tableNameWithType);
+ if (externalView == null) {
+ throw new IllegalStateException("External view does not exist for table:
" + tableNameWithType);
+ }
+ Map<String, String> instanceStateMap =
externalView.getStateMap(segmentName);
+ if (instanceStateMap == null) {
+ throw new IllegalStateException("Failed to find segment: " +
segmentName);
+ }
+ for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
+ if
(entry.getValue().equals(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE))
{
+ return entry.getKey();
+ }
+ }
+ throw new IllegalStateException("Failed to find ONLINE server for segment:
" + segmentName);
+ }
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
index aa37ac871a..4c200b9606 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java
@@ -18,118 +18,28 @@
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.http.client.utils.URIBuilder;
import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
-import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import
org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import
org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.roaringbitmap.PeekableIntIterator;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(UpsertCompactionTaskExecutor.class);
- private static HelixManager _helixManager = MINION_CONTEXT.getHelixManager();
- private static HelixAdmin _clusterManagementTool =
_helixManager.getClusterManagmentTool();
- private static String _clusterName = _helixManager.getClusterName();
-
- private class CompactedRecordReader implements RecordReader {
- private final PinotSegmentRecordReader _pinotSegmentRecordReader;
- private final PeekableIntIterator _validDocIdsIterator;
- // Reusable generic row to store the next row to return
- GenericRow _nextRow = new GenericRow();
- // Flag to mark whether we need to fetch another row
- boolean _nextRowReturned = true;
-
- CompactedRecordReader(File indexDir, ImmutableRoaringBitmap validDocIds) {
- _pinotSegmentRecordReader = new PinotSegmentRecordReader();
- _pinotSegmentRecordReader.init(indexDir, null, null);
- _validDocIdsIterator = validDocIds.getIntIterator();
- }
-
- @Override
- public void init(File dataFile, Set<String> fieldsToRead, @Nullable
RecordReaderConfig recordReaderConfig) {
- }
-
- @Override
- public boolean hasNext() {
- if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
- return false;
- }
-
- // If next row has not been returned, return true
- if (!_nextRowReturned) {
- return true;
- }
-
- // Try to get the next row to return
- if (_validDocIdsIterator.hasNext()) {
- int docId = _validDocIdsIterator.next();
- _nextRow.clear();
- _pinotSegmentRecordReader.getRecord(docId, _nextRow);
- _nextRowReturned = false;
- return true;
- }
-
- // Cannot find next row to return, return false
- return false;
- }
-
- @Override
- public GenericRow next() {
- return next(new GenericRow());
- }
-
- @Override
- public GenericRow next(GenericRow reuse) {
- Preconditions.checkState(!_nextRowReturned);
- reuse.init(_nextRow);
- _nextRowReturned = true;
- return reuse;
- }
-
- @Override
- public void rewind() {
- _pinotSegmentRecordReader.rewind();
- _nextRowReturned = true;
- }
-
- @Override
- public void close()
- throws IOException {
- _pinotSegmentRecordReader.close();
- }
- }
@Override
protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig,
File indexDir, File workingDir)
@@ -143,7 +53,7 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
TableConfig tableConfig = getTableConfig(tableNameWithType);
- ImmutableRoaringBitmap validDocIds = getValidDocIds(tableNameWithType,
configs);
+ RoaringBitmap validDocIds =
MinionTaskUtils.getValidDocIds(tableNameWithType, segmentName, configs,
MINION_CONTEXT);
if (validDocIds.isEmpty()) {
// prevents empty segment generation
@@ -159,7 +69,8 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
}
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
- try (CompactedRecordReader compactedRecordReader = new
CompactedRecordReader(indexDir, validDocIds)) {
+ try (CompactedPinotSegmentRecordReader compactedRecordReader = new
CompactedPinotSegmentRecordReader(indexDir,
+ validDocIds)) {
SegmentGeneratorConfig config = getSegmentGeneratorConfig(workingDir,
tableConfig, segmentMetadata, segmentName);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
driver.init(config, compactedRecordReader);
@@ -198,46 +109,6 @@ public class UpsertCompactionTaskExecutor extends
BaseSingleSegmentConversionExe
return config;
}
- // TODO: Consider moving this method to a more appropriate class (eg
ServerSegmentMetadataReader)
- private static ImmutableRoaringBitmap getValidDocIds(String
tableNameWithType, Map<String, String> configs)
- throws URISyntaxException {
- String segmentName = configs.get(MinionConstants.SEGMENT_NAME_KEY);
- String server = getServer(segmentName, tableNameWithType);
-
- // get the url for the validDocIds for the server
- InstanceConfig instanceConfig =
_clusterManagementTool.getInstanceConfig(_clusterName, server);
- String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
- String url =
- new
URIBuilder(endpoint).setPath(String.format("/segments/%s/%s/validDocIds",
tableNameWithType, segmentName))
- .toString();
-
- // get the validDocIds from that server
- Response response =
ClientBuilder.newClient().target(url).request().get(Response.class);
- Preconditions.checkState(response.getStatus() ==
Response.Status.OK.getStatusCode(),
- "Unable to retrieve validDocIds from %s", url);
- byte[] snapshot = response.readEntity(byte[].class);
- ImmutableRoaringBitmap validDocIds = new
ImmutableRoaringBitmap(ByteBuffer.wrap(snapshot));
- return validDocIds;
- }
-
- @VisibleForTesting
- public static String getServer(String segmentName, String tableNameWithType)
{
- ExternalView externalView =
_clusterManagementTool.getResourceExternalView(_clusterName, tableNameWithType);
- if (externalView == null) {
- throw new IllegalStateException("External view does not exist for table:
" + tableNameWithType);
- }
- Map<String, String> instanceStateMap =
externalView.getStateMap(segmentName);
- if (instanceStateMap == null) {
- throw new IllegalStateException("Failed to find segment: " +
segmentName);
- }
- for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) {
- if (entry.getValue().equals(SegmentStateModel.ONLINE)) {
- return entry.getKey();
- }
- }
- throw new IllegalStateException("Failed to find ONLINE server for segment:
" + segmentName);
- }
-
@Override
protected SegmentZKMetadataCustomMapModifier
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig,
SegmentConversionResult segmentConversionResult) {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 590102319e..e6eaf3679e 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -18,26 +18,20 @@
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
-import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.http.client.utils.URIBuilder;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
-import org.apache.pinot.controller.util.CompletionServiceHelper;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -45,7 +39,6 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +82,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
for (TableConfig tableConfig : tableConfigs) {
if (!validate(tableConfig)) {
+ LOGGER.warn("Validation failed for table {}. Skipping..",
tableConfig.getTableName());
continue;
}
@@ -103,6 +97,8 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
continue;
}
+ // TODO: add a check to see if the task is already running for the table
+
// get server to segment mappings
PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
Map<String, List<String>> serverToSegments =
pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
@@ -113,27 +109,21 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
throw new RuntimeException(e);
}
- Map<String, SegmentZKMetadata> completedSegmentsMap =
-
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
Function.identity()));
+ ServerSegmentMetadataReader serverSegmentMetadataReader =
+ new ServerSegmentMetadataReader(_clusterInfoAccessor.getExecutor(),
+ _clusterInfoAccessor.getConnectionManager());
- List<String> validDocIdUrls;
- try {
- validDocIdUrls = getValidDocIdMetadataUrls(serverToSegments,
serverToEndpoints, tableNameWithType,
- completedSegmentsMap.keySet());
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
+ // TODO: currently, we put segmentNames=null to get metadata for all
segments. We can change this to get
+ // valid doc id metadata in batches with the loop.
+ List<ValidDocIdMetadataInfo> validDocIdMetadataList =
+
serverSegmentMetadataReader.getValidDocIdMetadataFromServer(tableNameWithType,
serverToSegments,
+ serverToEndpoints, null, 60_000);
- // request the urls from the servers
- CompletionServiceHelper completionServiceHelper =
- new CompletionServiceHelper(_clusterInfoAccessor.getExecutor(),
_clusterInfoAccessor.getConnectionManager(),
- serverToEndpoints.inverse());
-
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiGetRequest(validDocIdUrls,
tableNameWithType, false, 3000);
+ Map<String, SegmentZKMetadata> completedSegmentsMap =
+
completedSegments.stream().collect(Collectors.toMap(SegmentZKMetadata::getSegmentName,
Function.identity()));
SegmentSelectionResult segmentSelectionResult =
- processValidDocIdMetadata(taskConfigs, completedSegmentsMap,
serviceResponse._httpResponses.entrySet());
+ processValidDocIdMetadata(taskConfigs, completedSegmentsMap,
validDocIdMetadataList);
if (!segmentSelectionResult.getSegmentsForDeletion().isEmpty()) {
pinotHelixResourceManager.deleteSegments(tableNameWithType,
segmentSelectionResult.getSegmentsForDeletion(),
@@ -163,7 +153,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
@VisibleForTesting
public static SegmentSelectionResult processValidDocIdMetadata(Map<String,
String> taskConfigs,
- Map<String, SegmentZKMetadata> completedSegmentsMap,
Set<Map.Entry<String, String>> responseSet) {
+ Map<String, SegmentZKMetadata> completedSegmentsMap,
List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList) {
double invalidRecordsThresholdPercent = Double.parseDouble(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
@@ -172,62 +162,22 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
- for (Map.Entry<String, String> streamResponse : responseSet) {
- JsonNode allValidDocIdMetadata;
- try {
- allValidDocIdMetadata =
JsonUtils.stringToJsonNode(streamResponse.getValue());
- } catch (IOException e) {
- LOGGER.error("Unable to parse validDocIdMetadata response for: {}",
streamResponse.getKey());
- continue;
- }
- Iterator<JsonNode> iterator = allValidDocIdMetadata.elements();
- while (iterator.hasNext()) {
- JsonNode validDocIdMetadata = iterator.next();
- long totalInvalidDocs =
validDocIdMetadata.get("totalInvalidDocs").asLong();
- String segmentName = validDocIdMetadata.get("segmentName").asText();
- SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
- long totalDocs = validDocIdMetadata.get("totalDocs").asLong();
- double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs)
* 100;
- if (totalInvalidDocs == totalDocs) {
- segmentsForDeletion.add(segment.getSegmentName());
- } else if (invalidRecordPercent > invalidRecordsThresholdPercent
- && totalInvalidDocs > invalidRecordsThresholdCount) {
- segmentsForCompaction.add(segment);
- }
+ for (ValidDocIdMetadataInfo validDocIdMetadata :
validDocIdMetadataInfoList) {
+ long totalInvalidDocs = validDocIdMetadata.getTotalInvalidDocs();
+ String segmentName = validDocIdMetadata.getSegmentName();
+ SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+ long totalDocs = validDocIdMetadata.getTotalDocs();
+ double invalidRecordPercent = ((double) totalInvalidDocs / totalDocs) *
100;
+ if (totalInvalidDocs == totalDocs) {
+ segmentsForDeletion.add(segment.getSegmentName());
+ } else if (invalidRecordPercent > invalidRecordsThresholdPercent
+ && totalInvalidDocs > invalidRecordsThresholdCount) {
+ segmentsForCompaction.add(segment);
}
}
return new SegmentSelectionResult(segmentsForCompaction,
segmentsForDeletion);
}
- @VisibleForTesting
- public static List<String> getValidDocIdMetadataUrls(Map<String,
List<String>> serverToSegments,
- BiMap<String, String> serverToEndpoints, String tableNameWithType,
Set<String> completedSegments)
- throws URISyntaxException {
- Set<String> remainingSegments = new HashSet<>(completedSegments);
- List<String> urls = new ArrayList<>();
- for (Map.Entry<String, List<String>> entry : serverToSegments.entrySet()) {
- if (remainingSegments.isEmpty()) {
- break;
- }
- String server = entry.getKey();
- List<String> segmentNames = entry.getValue();
- URIBuilder uriBuilder = new
URIBuilder(serverToEndpoints.get(server)).setPath(
- String.format("/tables/%s/validDocIdMetadata", tableNameWithType));
- int completedSegmentCountPerServer = 0;
- for (String segmentName : segmentNames) {
- if (remainingSegments.remove(segmentName)) {
- completedSegmentCountPerServer++;
- uriBuilder.addParameter("segmentNames", segmentName);
- }
- }
- if (completedSegmentCountPerServer > 0) {
- // only add to the list if the server has completed segments
- urls.add(uriBuilder.toString());
- }
- }
- return urls;
- }
-
private List<SegmentZKMetadata> getCompletedSegments(String
tableNameWithType, Map<String, String> taskConfigs) {
List<SegmentZKMetadata> completedSegments = new ArrayList<>();
String bufferPeriod =
taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY,
DEFAULT_BUFFER_PERIOD);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
index 604c58f6d0..0869880ccf 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -51,13 +52,15 @@ public class UpsertCompactionTaskExecutorTest {
Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(clusterManagementTool);
minionContext.setHelixManager(helixManager);
- String server = UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME,
REALTIME_TABLE_NAME);
+ String server = MinionTaskUtils.getServer(SEGMENT_NAME,
REALTIME_TABLE_NAME, helixManager.getClusterManagmentTool(),
+ helixManager.getClusterName());
Assert.assertEquals(server, "server1");
// verify exception thrown with OFFLINE server
map.put("server1", SegmentStateModel.OFFLINE);
Assert.assertThrows(IllegalStateException.class,
- () -> UpsertCompactionTaskExecutor.getServer(SEGMENT_NAME,
REALTIME_TABLE_NAME));
+ () -> MinionTaskUtils.getServer(SEGMENT_NAME, REALTIME_TABLE_NAME,
+ helixManager.getClusterManagmentTool(),
helixManager.getClusterName()));
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 03908a958e..7da9a7f1e8 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -18,20 +18,17 @@
*/
package org.apache.pinot.plugin.minion.tasks.upsertcompaction;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-import java.net.URISyntaxException;
-import java.util.AbstractMap;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.restlet.resources.ValidDocIdMetadataInfo;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.UpsertCompactionTask;
@@ -41,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.BeforeClass;
@@ -171,57 +169,6 @@ public class UpsertCompactionTaskGeneratorTest {
assertEquals(pinotTaskConfigs.size(), 0);
}
- @Test
- public void testGetValidDocIdMetadataUrls()
- throws URISyntaxException {
- Map<String, List<String>> serverToSegments = new HashMap<>();
- serverToSegments.put("server1",
- Lists.newArrayList(_completedSegment.getSegmentName(),
_completedSegment2.getSegmentName()));
- serverToSegments.put("server2", Lists.newArrayList("consumingSegment"));
- BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
- serverToEndpoints.put("server1", "http://endpoint1");
- serverToEndpoints.put("server2", "http://endpoint2");
- Set<String> completedSegments = new HashSet<>();
- completedSegments.add(_completedSegment.getSegmentName());
- completedSegments.add(_completedSegment2.getSegmentName());
-
- List<String> validDocIdUrls =
-
UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments,
serverToEndpoints,
- REALTIME_TABLE_NAME, completedSegments);
-
- String expectedUrl =
-
String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s",
"http://endpoint1",
- REALTIME_TABLE_NAME, _completedSegment.getSegmentName(),
_completedSegment2.getSegmentName());
- assertEquals(validDocIdUrls.get(0), expectedUrl);
- assertEquals(validDocIdUrls.size(), 1);
- }
-
- @Test
- public void testGetValidDocIdMetadataUrlsWithReplicatedSegments()
- throws URISyntaxException {
- Map<String, List<String>> serverToSegments = new LinkedHashMap<>();
- serverToSegments.put("server1",
- Lists.newArrayList(_completedSegment.getSegmentName(),
_completedSegment2.getSegmentName()));
- serverToSegments.put("server2",
- Lists.newArrayList(_completedSegment.getSegmentName(),
_completedSegment2.getSegmentName()));
- BiMap<String, String> serverToEndpoints = HashBiMap.create(1);
- serverToEndpoints.put("server1", "http://endpoint1");
- serverToEndpoints.put("server2", "http://endpoint2");
- Set<String> completedSegments = new HashSet<>();
- completedSegments.add(_completedSegment.getSegmentName());
- completedSegments.add(_completedSegment2.getSegmentName());
-
- List<String> validDocIdUrls =
-
UpsertCompactionTaskGenerator.getValidDocIdMetadataUrls(serverToSegments,
serverToEndpoints,
- REALTIME_TABLE_NAME, completedSegments);
-
- String expectedUrl =
-
String.format("%s/tables/%s/validDocIdMetadata?segmentNames=%s&segmentNames=%s",
"http://endpoint1",
- REALTIME_TABLE_NAME, _completedSegment.getSegmentName(),
_completedSegment2.getSegmentName());
- assertEquals(validDocIdUrls.get(0), expectedUrl);
- assertEquals(validDocIdUrls.size(), 1);
- }
-
@Test
public void testGetMaxTasks() {
Map<String, String> taskConfigs = new HashMap<>();
@@ -234,16 +181,20 @@ public class UpsertCompactionTaskGeneratorTest {
}
@Test
- public void testProcessValidDocIdMetadata() {
+ public void testProcessValidDocIdMetadata()
+ throws IOException {
Map<String, String> compactionConfigs = getCompactionConfigs("1", "10");
- Set<Map.Entry<String, String>> responseSet = new HashSet<>();
+ List<ValidDocIdMetadataInfo> validDocIdMetadataInfoList = new
ArrayList<>();
String json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" :
50," + "\"segmentName\" : \""
+ _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" +
"}," + "{" + "\"totalValidDocs\" : 0,"
+ "\"totalInvalidDocs\" : 10," + "\"segmentName\" : \"" +
_completedSegment2.getSegmentName() + "\","
+ "\"totalDocs\" : 10" + "}]";
- responseSet.add(new AbstractMap.SimpleEntry<>("", json));
+ List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
+ JsonUtils.stringToObject(json, new
TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+ });
UpsertCompactionTaskGenerator.SegmentSelectionResult
segmentSelectionResult =
-
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap, responseSet);
+
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap,
+ validDocIdMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
@@ -251,20 +202,23 @@ public class UpsertCompactionTaskGeneratorTest {
// test with a higher invalidRecordsThresholdPercent
compactionConfigs = getCompactionConfigs("60", "10");
segmentSelectionResult =
-
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap, responseSet);
+
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap,
+ validDocIdMetadataInfo);
assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
// test without an invalidRecordsThresholdPercent
compactionConfigs = getCompactionConfigs("0", "10");
segmentSelectionResult =
-
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap, responseSet);
+
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap,
+ validDocIdMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
// test without a invalidRecordsThresholdCount
compactionConfigs = getCompactionConfigs("30", "0");
segmentSelectionResult =
-
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap, responseSet);
+
UpsertCompactionTaskGenerator.processValidDocIdMetadata(compactionConfigs,
_completedSegmentsMap,
+ validDocIdMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
}
@@ -284,7 +238,7 @@ public class UpsertCompactionTaskGeneratorTest {
private IdealState getIdealState(String tableName, List<String>
segmentNames) {
IdealState idealState = new IdealState(tableName);
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- for (String segmentName: segmentNames) {
+ for (String segmentName : segmentNames) {
idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
}
return idealState;
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
new file mode 100644
index 0000000000..2795982ab6
--- /dev/null
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/CompactedPinotSegmentRecordReader.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.readers;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * Compacted Pinot Segment Record Reader used for upsert compaction
+ */
+public class CompactedPinotSegmentRecordReader implements RecordReader {
+ private final PinotSegmentRecordReader _pinotSegmentRecordReader;
+ private final PeekableIntIterator _validDocIdsIterator;
+ // Reusable generic row to store the next row to return
+ GenericRow _nextRow = new GenericRow();
+ // Flag to mark whether we need to fetch another row
+ boolean _nextRowReturned = true;
+
+ public CompactedPinotSegmentRecordReader(File indexDir, RoaringBitmap
validDocIds) {
+ _pinotSegmentRecordReader = new PinotSegmentRecordReader();
+ _pinotSegmentRecordReader.init(indexDir, null, null);
+ _validDocIdsIterator = validDocIds.getIntIterator();
+ }
+
+ @Override
+ public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
+ throws IOException {
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!_validDocIdsIterator.hasNext() && _nextRowReturned) {
+ return false;
+ }
+
+ // If next row has not been returned, return true
+ if (!_nextRowReturned) {
+ return true;
+ }
+
+ // Try to get the next row to return
+ if (_validDocIdsIterator.hasNext()) {
+ int docId = _validDocIdsIterator.next();
+ _nextRow.clear();
+ _pinotSegmentRecordReader.getRecord(docId, _nextRow);
+ _nextRowReturned = false;
+ return true;
+ }
+
+ // Cannot find next row to return, return false
+ return false;
+ }
+
+ @Override
+ public GenericRow next()
+ throws IOException {
+ return next(new GenericRow());
+ }
+
+ @Override
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ Preconditions.checkState(!_nextRowReturned);
+ reuse.init(_nextRow);
+ _nextRowReturned = true;
+ return reuse;
+ }
+
+ @Override
+ public void rewind()
+ throws IOException {
+ _pinotSegmentRecordReader.rewind();
+ _nextRowReturned = true;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _pinotSegmentRecordReader.close();
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index b08833b966..29f90715da 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -529,31 +529,64 @@ public class TablesResource {
public String getValidDocIdMetadata(
@ApiParam(value = "Table name including type", required = true, example
= "myTable_REALTIME")
@PathParam("tableNameWithType") String tableNameWithType,
- @ApiParam(value = "Segment name", allowMultiple = true, required = true)
@QueryParam("segmentNames")
+ @ApiParam(value = "Segment name", allowMultiple = true)
@QueryParam("segmentNames")
List<String> segmentNames) {
+ return
ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType,
segmentNames));
+ }
+
+ @POST
+ @Path("/tables/{tableNameWithType}/validDocIdMetadata")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Provides segment validDocId metadata", notes =
"Provides segment validDocId metadata")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 500, message = "Internal server error", response =
ErrorInfo.class),
+ @ApiResponse(code = 404, message = "Table or segment not found",
response = ErrorInfo.class)
+ })
+ public String getValidDocIdMetadata(
+ @ApiParam(value = "Table name including type", required = true, example
= "myTable_REALTIME")
+ @PathParam("tableNameWithType") String tableNameWithType, TableSegments
tableSegments) {
+ List<String> segmentNames = tableSegments.getSegments();
+ return
ResourceUtils.convertToJsonString(processValidDocIdMetadata(tableNameWithType,
segmentNames));
+ }
+
+ private List<Map<String, Object>> processValidDocIdMetadata(String
tableNameWithType, List<String> segments) {
TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
List<String> missingSegments = new ArrayList<>();
- List<SegmentDataManager> segmentDataManagers =
tableDataManager.acquireSegments(segmentNames, missingSegments);
- if (!missingSegments.isEmpty()) {
- throw new WebApplicationException(String.format("Table %s has missing
segments", tableNameWithType),
- Response.Status.NOT_FOUND);
+ List<SegmentDataManager> segmentDataManagers;
+ if (segments == null) {
+ segmentDataManagers = tableDataManager.acquireAllSegments();
+ } else {
+ segmentDataManagers = tableDataManager.acquireSegments(segments,
missingSegments);
+ if (!missingSegments.isEmpty()) {
+ throw new WebApplicationException(
+ String.format("Table %s has missing segments: %s)",
tableNameWithType, segments),
+ Response.Status.NOT_FOUND);
+ }
}
List<Map<String, Object>> allValidDocIdMetadata = new ArrayList<>();
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
try {
IndexSegment indexSegment = segmentDataManager.getSegment();
+ if (indexSegment == null) {
+ LOGGER.warn("Table {} segment {} does not exist", tableNameWithType,
segmentDataManager.getSegmentName());
+ continue;
+ }
+ // Skip the consuming segments
if (!(indexSegment instanceof ImmutableSegmentImpl)) {
- throw new WebApplicationException(
- String.format("Table %s segment %s is not a immutable segment",
tableNameWithType,
- segmentDataManager.getSegmentName()),
Response.Status.BAD_REQUEST);
+ String msg = String.format("Table %s segment %s is not a immutable
segment", tableNameWithType,
+ segmentDataManager.getSegmentName());
+ LOGGER.warn(msg);
+ continue;
}
MutableRoaringBitmap validDocIds =
indexSegment.getValidDocIds() != null ?
indexSegment.getValidDocIds().getMutableRoaringBitmap() : null;
if (validDocIds == null) {
- throw new WebApplicationException(
- String.format("Missing validDocIds for table %s segment %s does
not exist", tableNameWithType,
- segmentDataManager.getSegmentName()),
Response.Status.NOT_FOUND);
+ String msg = String.format("Missing validDocIds for table %s segment
%s does not exist", tableNameWithType,
+ segmentDataManager.getSegmentName());
+ LOGGER.warn(msg);
+ throw new WebApplicationException(msg, Response.Status.NOT_FOUND);
}
Map<String, Object> validDocIdMetadata = new HashMap<>();
int totalDocs = indexSegment.getSegmentMetadata().getTotalDocs();
@@ -568,7 +601,7 @@ public class TablesResource {
tableDataManager.releaseSegment(segmentDataManager);
}
}
- return ResourceUtils.convertToJsonString(allValidDocIdMetadata);
+ return allValidDocIdMetadata;
}
/**
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 0469d805d2..79b17396de 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.io.FileUtils;
@@ -311,6 +312,27 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(),
99992);
}
+ @Test
+ public void testValidDocIdMetadataPost()
+ throws IOException {
+ IndexSegment segment = _realtimeIndexSegments.get(0);
+ // Verify the content of the downloaded snapshot from a realtime table.
+
downLoadAndVerifyValidDocIdsSnapshot(TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME),
+ (ImmutableSegmentImpl) segment);
+ List<String> segments = List.of(segment.getSegmentName());
+ TableSegments tableSegments = new TableSegments(segments);
+ String validDocIdMetadataPath =
+ "/tables/" + TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME) +
"/validDocIdMetadata";
+ String response =
+ _webTarget.path(validDocIdMetadataPath).queryParam("segmentNames",
segment.getSegmentName()).request()
+ .post(Entity.json(tableSegments), String.class);
+ JsonNode validDocIdMetadata = JsonUtils.stringToJsonNode(response).get(0);
+
+ Assert.assertEquals(validDocIdMetadata.get("totalDocs").asInt(), 100000);
+ Assert.assertEquals(validDocIdMetadata.get("totalValidDocs").asInt(), 8);
+ Assert.assertEquals(validDocIdMetadata.get("totalInvalidDocs").asInt(),
99992);
+ }
+
// Verify metadata file from segments.
private void downLoadAndVerifySegmentContent(String tableNameWithType,
IndexSegment segment)
throws IOException, ConfigurationException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]