This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5ecca80cc8 Force commit consuming segments (#9197)
5ecca80cc8 is described below
commit 5ecca80cc8ac763d5c3ee256ea9504f60c581ecd
Author: Sajjad Moradi <[email protected]>
AuthorDate: Mon Aug 29 11:43:30 2022 -0700
Force commit consuming segments (#9197)
---
.../api/resources/PinotRealtimeTableResource.java | 20 ++++++++++++++++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 9 +++++++++
2 files changed, 29 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index ba03a6cc5c..7bf4ac4235 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -90,6 +90,26 @@ public class PinotRealtimeTableResource {
}
}
+ @POST
+ @Path("/tables/{tableName}/forceCommit")
+ @ApiOperation(value = "Force commit the current consuming segments",
+ notes = "Force commit the current segments in consuming state and
restart consumption. "
+ + "This should be used after schema/table config changes. "
+ + "Please note that this is an asynchronous operation, "
+ + "and 200 response does not mean it has actually been done already")
+ public Response forceCommit(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+ return Response.ok().build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+
@GET
@Path("/tables/{tableName}/pauseStatus")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index fa4f3063e0..5cec6b8c4b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1420,6 +1420,15 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ /**
+ * Force commit the current segments in consuming state and restart
consumption
+ */
+ public void forceCommit(String tableNameWithType) {
+ IdealState idealState = getIdealState(tableNameWithType);
+ Set<String> consumingSegments = findConsumingSegments(idealState);
+ sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
+ }
+
/**
* Pause consumption on a table by
* 1) setting "isTablePaused" in ideal states to true and
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]