sd4324530 commented on PR #4368: URL: https://github.com/apache/flink-cdc/pull/4368#issuecomment-4418952995
> > When I ran the e2e test, I found that the default value of the config option `record.size.max.bytes` was incorrect. > > The value of `record.size.max.bytes` cannot be greater than the value of `batch.size.max.bytes`. > > @sd4324530 Could you please explain the reason in more detail? @lvyanquan In the current implementation of `ElasticsearchDataSink`, the `EventSinkProvider` for ES6 and 7 uses the `ElasticsearchSink` provided by `flink-connector-elasticsearch`, which we will call Solution A.[1] For ES8, we use our own implementation of `Elasticsearch8AsyncSink`, which we will call Solution B.[2] Differences in the implementation of `SinkWriter`: Solution A uses the `ElasticsearchWriter` class provided by `flink-connector-elasticsearch`, which implements the `SinkWriter` interface.[3] Solution B is our own implementation of `Elasticsearch8AsyncWriter`, which implements the base class `AsyncSinkWriter`, unlike Solution A.[4] The difference between `batch.size.max.bytes` and `record.size.max.bytes`: Solution A: The implemented `EventSinkProvider` does not use these two configuration options. Option B: The implemented `EventSinkProvider` uses these two config options, and ultimately performs validity checks on these two options in the constructor of `Elasticsearch8AsyncWriter` (from the base class `AsyncSinkWriter`). Currently, `batch.size.max.bytes` and `record.size.max.bytes` only apply to ES8 and must comply with `AsyncSinkWriter`, which states that `the maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record`.[5][6] [1] https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java#L69-L72 [2] https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSink.java#L73-L74 [3] https://github.com/apache/flink-connector-elasticsearch/blob/v3.1.0/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java#L55 [4] https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java#L56 [5] https://github.com/apache/flink/blob/release-1.20.3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L363-L366 [6] https://github.com/apache/flink/blob/release-2.2.0/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L262-L265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
