zentol commented on a change in pull request #12674:
URL: https://github.com/apache/flink/pull/12674#discussion_r440760231
##########
File path:
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
##########
@@ -242,20 +253,41 @@ public void sendMessages(String topic, String...
messages) throws IOException {
"--topic",
topic)) {
- try (PrintStream printStream = new
PrintStream(autoClosableProcess.getProcess().getOutputStream(), true,
StandardCharsets.UTF_8.name())) {
- for (final String message : messages) {
- printStream.println(message);
- }
- printStream.flush();
- }
+ sendMessagesAndWait(autoClosableProcess, messages);
+ }
+ }
- try {
- // wait until the process shuts down on it's own
- // this is the only reliable way to ensure the
producer has actually processed our input
- autoClosableProcess.getProcess().waitFor();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ @Override
+ public void sendKeyedMessages(String topic, String keySeparator,
String... messages) throws IOException {
+ try (AutoClosableProcess autoClosableProcess =
AutoClosableProcess.runNonBlocking(
+ kafkaDir.resolve(Paths.get("bin",
"kafka-console-producer.sh")).toString(),
Review comment:
It would be neat if we could further de-duplicate this; the only
difference are additional arguments.
I'm thinking of something like:
```
sendMessagesAndAwait(topic, messages, additionalArguments) {
try (... runNonBlocking(join(baseArgs, additionalArguments))) {
...
}
}
```
But I would be fine with moving this into a follow-up to get this in sooner.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]