This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch release-0.11-rc
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/release-0.11-rc by this push:
new 58e2740acd Force commit consuming segments (#9197) (#9315)
58e2740acd is described below
commit 58e2740acd410ba81efdca3a4774a2eb1366fdc9
Author: Rong Rong <[email protected]>
AuthorDate: Wed Aug 31 20:05:03 2022 -0700
Force commit consuming segments (#9197) (#9315)
---
.../api/resources/PinotRealtimeTableResource.java | 20 ++++++++++++++++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 9 +++++++++
2 files changed, 29 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index ba03a6cc5c..7bf4ac4235 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -90,6 +90,26 @@ public class PinotRealtimeTableResource {
}
}
+ @POST
+ @Path("/tables/{tableName}/forceCommit")
+ @ApiOperation(value = "Force commit the current consuming segments",
+ notes = "Force commit the current segments in consuming state and
restart consumption. "
+ + "This should be used after schema/table config changes. "
+ + "Please note that this is an asynchronous operation, "
+ + "and 200 response does not mean it has actually been done already")
+ public Response forceCommit(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+ return Response.ok().build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+
@GET
@Path("/tables/{tableName}/pauseStatus")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index fa4f3063e0..5cec6b8c4b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1420,6 +1420,15 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ /**
+ * Force commit the current segments in consuming state and restart
consumption
+ */
+ public void forceCommit(String tableNameWithType) {
+ IdealState idealState = getIdealState(tableNameWithType);
+ Set<String> consumingSegments = findConsumingSegments(idealState);
+ sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ }
+
/**
* Pause consumption on a table by
* 1) setting "isTablePaused" in ideal states to true and
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]