This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e1e0637d989 [ErrorProne] Fix FormatStringShouldUsePlaceholders string
concat when Preconditions is used (#37745)
e1e0637d989 is described below
commit e1e0637d989db41992ba8a8a186990bc219dd872
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Wed Mar 4 10:32:27 2026 +0100
[ErrorProne] Fix FormatStringShouldUsePlaceholders string concat when
Preconditions is used (#37745)
---
.../flink/adapter/BeamFlinkDataStreamAdapter.java | 3 +-
.../functions/FlinkSideInputReader.java | 2 +-
.../dataflow/worker/windmill/state/RangeCoder.java | 4 +--
...renceCountingExecutableStageContextFactory.java | 2 +-
.../apache/beam/runners/prism/PrismLocator.java | 2 +-
.../apache/beam/sdk/coders/BigDecimalCoder.java | 4 +--
.../apache/beam/sdk/schemas/AutoValueSchema.java | 10 +++---
.../apache/beam/sdk/schemas/JavaBeanSchema.java | 10 +++---
.../apache/beam/sdk/schemas/JavaFieldSchema.java | 10 +++---
.../beam/sdk/transforms/DoFnOutputReceivers.java | 3 +-
.../org/apache/beam/sdk/util/InstanceBuilder.java | 24 ++++++-------
.../beam/sdk/coders/BigDecimalCoderTest.java | 1 -
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 39 ++++++++++------------
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 23 +++++--------
.../io/gcp/bigquery/BeamRowToStorageApiProto.java | 10 +++---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 ++--
.../beam/sdk/io/kafka/KafkaUnboundedSource.java | 8 ++---
.../sdk/io/kafka/WatchForKafkaTopicPartitions.java | 6 ++--
18 files changed, 71 insertions(+), 96 deletions(-)
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
index 435182909fb..c08668e1baf 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
@@ -228,7 +228,8 @@ public class BeamFlinkDataStreamAdapter {
DataStream<InputT> flinkInput =
Preconditions.checkStateNotNull(
(DataStream<InputT>) inputMap.get(inputId),
- "missing input referenced in proto: " + inputId);
+ "missing input referenced in proto: %s",
+ inputId);
context.addDataStream(
Iterables.getOnlyElement(transform.getOutputsMap().values()),
flinkInput.process(
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index 438688015c4..854b8e300cd 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -74,7 +74,7 @@ public class FlinkSideInputReader implements SideInputReader {
public <T> @Nullable T get(PCollectionView<T> view, BoundedWindow window) {
checkNotNull(view, "View passed to sideInput cannot be null");
TupleTag<?> tag = view.getTagInternal();
- checkNotNull(sideInputs.get(tag), "Side input for " + view + " not
available.");
+ checkNotNull(sideInputs.get(tag), "Side input for %s not available.",
view);
Map<BoundedWindow, T> sideInputs =
runtimeContext.getBroadcastVariableWithInitializer(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
index 0e11531226f..e30376819f8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
@@ -54,9 +54,9 @@ class RangeCoder<T extends Comparable<?>> extends
StructuredCoder<Range<T>> {
@Override
public void encode(Range<T> value, OutputStream outStream) throws
IOException {
Preconditions.checkState(
- value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " +
value);
+ value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range
%s", value);
Preconditions.checkState(
- value.upperBoundType().equals(BoundType.OPEN), "unexpected range " +
value);
+ value.upperBoundType().equals(BoundType.OPEN), "unexpected range %s",
value);
boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null,
outStream);
boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null,
outStream);
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
index 54fccbceada..45b1b1942c4 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
@@ -115,7 +115,7 @@ public class ReferenceCountingExecutableStageContextFactory
private void scheduleRelease(JobInfo jobInfo) {
WrappedContext wrapper = getCache().get(jobInfo.jobId());
Preconditions.checkState(
- wrapper != null, "Releasing context for unknown job: " +
jobInfo.jobId());
+ wrapper != null, "Releasing context for unknown job %s",
jobInfo.jobId());
PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
diff --git
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
index b32f03e78e6..0eef65cfc46 100644
---
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
+++
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
@@ -242,7 +242,7 @@ class PrismLocator {
}
private static String mustGetPropertyAsLowerCase(String name) {
- return checkStateNotNull(System.getProperty(name), "System property: " +
name + " not set")
+ return checkStateNotNull(System.getProperty(name), "System property: %s
not set", name)
.toLowerCase();
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 6cfd6a435dd..1fa2c5165fd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -55,7 +55,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
@Override
public void encode(BigDecimal value, OutputStream outStream, Context context)
throws IOException, CoderException {
- checkNotNull(value, String.format("cannot encode a null %s",
BigDecimal.class.getSimpleName()));
+ checkNotNull(value);
VAR_INT_CODER.encode(value.scale(), outStream);
BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
}
@@ -108,7 +108,7 @@ public class BigDecimalCoder extends
AtomicCoder<BigDecimal> {
*/
@Override
protected long getEncodedElementByteSize(BigDecimal value) throws Exception {
- checkNotNull(value, String.format("cannot encode a null %s",
BigDecimal.class.getSimpleName()));
+ checkNotNull(value);
return VAR_INT_CODER.getEncodedElementByteSize(value.scale())
+ BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue());
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
index 7016242299a..49eebe13af5 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
@@ -79,12 +79,10 @@ public class AutoValueSchema extends
GetterBasedSchemaProviderV2 {
}
Preconditions.checkState(
number == i,
- "Expected field number "
- + i
- + " for field + "
- + type.getName()
- + " instead got "
- + number);
+ "Expected field number %s for field %s instead got %s",
+ i,
+ type.getName(),
+ number);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 14adf2f6603..9d5e2fa9c74 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -81,12 +81,10 @@ public class JavaBeanSchema extends
GetterBasedSchemaProviderV2 {
}
Preconditions.checkState(
number == i,
- "Expected field number "
- + i
- + " for field: "
- + type.getName()
- + " instead got "
- + number);
+ "Expected field number %s for field %s instead got %s",
+ i,
+ type.getName(),
+ number);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 9a8eef2bf2c..3e0c9c87920 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -103,12 +103,10 @@ public class JavaFieldSchema extends
GetterBasedSchemaProviderV2 {
}
Preconditions.checkState(
number == i,
- "Expected field number "
- + i
- + " for field + "
- + type.getName()
- + " instead got "
- + number);
+ "Expected field number %s for field %s instead got %s",
+ i,
+ type.getName(),
+ number);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
index fee19810c15..e2c0825e027 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
@@ -169,7 +169,8 @@ public class DoFnOutputReceivers {
checkStateNotNull(outputCoder, "No output tag for %s ", tag);
checkState(
outputCoder instanceof SchemaCoder,
- "Output with tag " + tag + " must have a schema in order to call
getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ tag);
return DoFnOutputReceivers.rowReceiver(
context, builderSupplier, tag, (SchemaCoder<T>) outputCoder);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
index 467f5ab61a3..01dc8de1cc2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
@@ -195,19 +195,16 @@ public class InstanceBuilder<T> {
checkState(
Modifier.isStatic(method.getModifiers()),
- "Factory method must be a static method for "
- + factoryClass.getName()
- + "#"
- + method.getName());
+ "Factory method %s#%s must be a static method",
+ factoryClass.getName(),
+ method.getName());
checkState(
type.isAssignableFrom(method.getReturnType()),
- "Return type for "
- + factoryClass.getName()
- + "#"
- + method.getName()
- + " must be assignable to "
- + type.getSimpleName());
+ "Return type for %s#%s must be assignable to %s",
+ factoryClass.getName(),
+ method.getName(),
+ type.getSimpleName());
if (!method.isAccessible()) {
method.setAccessible(true);
@@ -248,10 +245,9 @@ public class InstanceBuilder<T> {
checkState(
type.isAssignableFrom(factoryClass),
- "Instance type "
- + factoryClass.getName()
- + " must be assignable to "
- + type.getSimpleName());
+ "Instance type %s must be assignable to %s",
+ factoryClass.getName(),
+ type.getSimpleName());
if (!constructor.isAccessible()) {
constructor.setAccessible(true);
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
index 063ca9517a2..f97c7fe75d8 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
@@ -109,7 +109,6 @@ public class BigDecimalCoderTest {
@Test
public void encodeNullThrowsException() throws Exception {
thrown.expect(NullPointerException.class);
- thrown.expectMessage("cannot encode a null BigDecimal");
CoderUtils.encodeToBase64(TEST_CODER, null);
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 78aceeaab19..052ff324490 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2087,9 +2087,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
checkState(
mainOutputSchemaCoder != null,
- "Output with tag "
- + mainOutputTag
- + " must have a schema in order to call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ mainOutputTag);
return mainRowOutputReceiver;
}
@@ -2130,9 +2129,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
- "Output with tag "
- + mainOutputTag
- + " must have a schema in order to call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ mainOutputTag);
return mainRowOutputReceiver;
}
@@ -2140,7 +2138,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
checkState(outputCoder != null, "No output tag for %s", tag);
checkState(
outputCoder instanceof SchemaCoder,
- "Output with tag " + tag + " must have a schema in order to
call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ tag);
return new OutputReceiver<Row>() {
private SerializableFunction<Row, T> fromRowFunction =
((SchemaCoder) outputCoder).getFromRowFunction();
@@ -2426,9 +2425,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
checkState(
mainOutputSchemaCoder != null,
- "Output with tag "
- + mainOutputTag
- + " must have a schema in order to call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ mainOutputTag);
return mainRowOutputReceiver;
}
@@ -2466,9 +2464,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
- "Output with tag "
- + mainOutputTag
- + " must have a schema in order to call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ mainOutputTag);
return mainRowOutputReceiver;
}
@@ -2476,7 +2473,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
checkState(outputCoder != null, "No output tag for %s", tag);
checkState(
outputCoder instanceof SchemaCoder,
- "Output with tag " + tag + " must have a schema in order to
call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ tag);
return new OutputReceiver<Row>() {
private SerializableFunction<Row, T> fromRowFunction =
((SchemaCoder) outputCoder).getFromRowFunction();
@@ -2736,9 +2734,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
checkState(
mainOutputSchemaCoder != null,
- "Output with tag "
- + mainOutputTag
- + " must have a schema in order to call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ mainOutputTag);
return mainRowOutputReceiver;
}
@@ -2776,9 +2773,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
- "Output with tag "
- + mainOutputTag
- + " must have a schema in order to call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ mainOutputTag);
return mainRowOutputReceiver;
}
@@ -2786,7 +2782,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
checkState(outputCoder != null, "No output tag for %s", tag);
checkState(
outputCoder instanceof SchemaCoder,
- "Output with tag " + tag + " must have a schema in order to
call getRowReceiver");
+ "Output with tag %s must have a schema in order to call
getRowReceiver",
+ tag);
return new OutputReceiver<Row>() {
private SerializableFunction<Row, T> fromRowFunction =
((SchemaCoder) outputCoder).getFromRowFunction();
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 1429253d194..28e63aeac58 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -770,27 +770,20 @@ public class CassandraIO {
public void validate(PipelineOptions pipelineOptions) {
checkState(
hosts() != null,
- "CassandraIO."
- + getMutationTypeName()
- + "() requires a list of hosts to be set via withHosts(hosts)");
+ "CassandraIO.%s() requires a list of hosts to be set via
withHosts(hosts)",
+ getMutationTypeName());
checkState(
port() != null,
- "CassandraIO."
- + getMutationTypeName()
- + "() requires a "
- + "valid port number to be set via withPort(port)");
+ "CassandraIO.%s() requires a valid port number to be set via
withPort(port)",
+ getMutationTypeName());
checkState(
keyspace() != null,
- "CassandraIO."
- + getMutationTypeName()
- + "() requires a keyspace to be set via "
- + "withKeyspace(keyspace)");
+ "CassandraIO.%s() requires a keyspace to be set via
withKeyspace(keyspace)",
+ getMutationTypeName());
checkState(
entity() != null,
- "CassandraIO."
- + getMutationTypeName()
- + "() requires an entity to be set via "
- + "withEntity(entity)");
+ "CassandraIO.%s() requires an entity to be set via
withEntity(entity)",
+ getMutationTypeName());
}
@Override
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index d940ff8dd7f..a4d20707304 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -227,13 +227,13 @@ public class BeamRowToStorageApiProto {
TypeName containedTypeName =
Preconditions.checkNotNull(
elementType.getTypeName(),
- "Null type name found in contained type at " +
field.getName());
+ "Null type name found in contained type at %s",
+ field.getName());
Preconditions.checkState(
!(containedTypeName.isCollectionType() ||
containedTypeName.isMapType()),
- "Nested container types are not supported by BigQuery. Field "
- + field.getName()
- + " contains a type "
- + containedTypeName.name());
+ "Nested container types are not supported by BigQuery. Field %s
contains a type %s",
+ field.getName(),
+ containedTypeName.name());
TableFieldSchema elementFieldSchema =
fieldDescriptorFromBeamField(Field.of(field.getName(),
elementType));
builder = builder.setType(elementFieldSchema.getType());
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 02d14b745fe..d247886cc67 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -2132,10 +2132,8 @@ public class KafkaIO {
if (logTopicVerification == null || !logTopicVerification) {
checkState(
partitionInfoList != null &&
!partitionInfoList.isEmpty(),
- "Could not find any partitions info for topic "
- + topic
- + ". Please check Kafka configuration and make sure "
- + "that provided topics exist.");
+ "Could not find any partitions info for topic %s. Please
check Kafka configuration and make sure that provided topics exist.",
+ topic);
} else {
LOG.warn(
"Could not find any partitions info for topic {}. Please
check Kafka configuration "
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 37fc6378e2d..01bbe61ab9a 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -129,11 +129,9 @@ class KafkaUnboundedSource<K, V> extends
UnboundedSource<KafkaRecord<K, V>, Kafk
for (Integer p : providedPartitions) {
checkState(
partitionsForTopic.contains(p),
- "Partition "
- + p
- + " does not exist for topic "
- + providedTopic
- + ". Please check Kafka configuration.");
+ "Partition %s does not exist for topic %s. Please check
Kafka configuration.",
+ p,
+ providedTopic);
}
} else {
for (Integer p : providedPartitions) {
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
index 65ece98d618..490faafb22f 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
@@ -193,10 +193,8 @@ class WatchForKafkaTopicPartitions extends
PTransform<PBegin, PCollection<KafkaS
List<PartitionInfo> partitionInfoList =
kafkaConsumer.partitionsFor(topic);
checkState(
partitionInfoList != null && !partitionInfoList.isEmpty(),
- "Could not find any partitions info for topic "
- + topic
- + ". Please check Kafka configuration and make sure "
- + "that provided topics exist.");
+ "Could not find any partitions info for topic %s. Please check
Kafka configuration and make sure that provided topics exist.",
+ topic);
for (PartitionInfo partition : partitionInfoList) {
current.add(new TopicPartition(topic, partition.partition()));
}