This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e1e0637d989 [ErrorProne] Fix FormatStringShouldUsePlaceholders string 
concat when Preconditions is used (#37745)
e1e0637d989 is described below

commit e1e0637d989db41992ba8a8a186990bc219dd872
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Wed Mar 4 10:32:27 2026 +0100

    [ErrorProne] Fix FormatStringShouldUsePlaceholders string concat when 
Preconditions is used (#37745)
---
 .../flink/adapter/BeamFlinkDataStreamAdapter.java  |  3 +-
 .../functions/FlinkSideInputReader.java            |  2 +-
 .../dataflow/worker/windmill/state/RangeCoder.java |  4 +--
 ...renceCountingExecutableStageContextFactory.java |  2 +-
 .../apache/beam/runners/prism/PrismLocator.java    |  2 +-
 .../apache/beam/sdk/coders/BigDecimalCoder.java    |  4 +--
 .../apache/beam/sdk/schemas/AutoValueSchema.java   | 10 +++---
 .../apache/beam/sdk/schemas/JavaBeanSchema.java    | 10 +++---
 .../apache/beam/sdk/schemas/JavaFieldSchema.java   | 10 +++---
 .../beam/sdk/transforms/DoFnOutputReceivers.java   |  3 +-
 .../org/apache/beam/sdk/util/InstanceBuilder.java  | 24 ++++++-------
 .../beam/sdk/coders/BigDecimalCoderTest.java       |  1 -
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 39 ++++++++++------------
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 23 +++++--------
 .../io/gcp/bigquery/BeamRowToStorageApiProto.java  | 10 +++---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  6 ++--
 .../beam/sdk/io/kafka/KafkaUnboundedSource.java    |  8 ++---
 .../sdk/io/kafka/WatchForKafkaTopicPartitions.java |  6 ++--
 18 files changed, 71 insertions(+), 96 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
index 435182909fb..c08668e1baf 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java
@@ -228,7 +228,8 @@ public class BeamFlinkDataStreamAdapter {
       DataStream<InputT> flinkInput =
           Preconditions.checkStateNotNull(
               (DataStream<InputT>) inputMap.get(inputId),
-              "missing input referenced in proto: " + inputId);
+              "missing input referenced in proto: %s",
+              inputId);
       context.addDataStream(
           Iterables.getOnlyElement(transform.getOutputsMap().values()),
           flinkInput.process(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
index 438688015c4..854b8e300cd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -74,7 +74,7 @@ public class FlinkSideInputReader implements SideInputReader {
   public <T> @Nullable T get(PCollectionView<T> view, BoundedWindow window) {
     checkNotNull(view, "View passed to sideInput cannot be null");
     TupleTag<?> tag = view.getTagInternal();
-    checkNotNull(sideInputs.get(tag), "Side input for " + view + " not 
available.");
+    checkNotNull(sideInputs.get(tag), "Side input for %s not available.", 
view);
 
     Map<BoundedWindow, T> sideInputs =
         runtimeContext.getBroadcastVariableWithInitializer(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
index 0e11531226f..e30376819f8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java
@@ -54,9 +54,9 @@ class RangeCoder<T extends Comparable<?>> extends 
StructuredCoder<Range<T>> {
   @Override
   public void encode(Range<T> value, OutputStream outStream) throws 
IOException {
     Preconditions.checkState(
-        value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + 
value);
+        value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range 
%s", value);
     Preconditions.checkState(
-        value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + 
value);
+        value.upperBoundType().equals(BoundType.OPEN), "unexpected range %s", 
value);
     boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, 
outStream);
     boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, 
outStream);
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
index 54fccbceada..45b1b1942c4 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
@@ -115,7 +115,7 @@ public class ReferenceCountingExecutableStageContextFactory
   private void scheduleRelease(JobInfo jobInfo) {
     WrappedContext wrapper = getCache().get(jobInfo.jobId());
     Preconditions.checkState(
-        wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
+        wrapper != null, "Releasing context for unknown job %s", 
jobInfo.jobId());
 
     PipelineOptions pipelineOptions =
         PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
diff --git 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
index b32f03e78e6..0eef65cfc46 100644
--- 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
+++ 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
@@ -242,7 +242,7 @@ class PrismLocator {
   }
 
   private static String mustGetPropertyAsLowerCase(String name) {
-    return checkStateNotNull(System.getProperty(name), "System property: " + 
name + " not set")
+    return checkStateNotNull(System.getProperty(name), "System property: %s 
not set", name)
         .toLowerCase();
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 6cfd6a435dd..1fa2c5165fd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -55,7 +55,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   @Override
   public void encode(BigDecimal value, OutputStream outStream, Context context)
       throws IOException, CoderException {
-    checkNotNull(value, String.format("cannot encode a null %s", 
BigDecimal.class.getSimpleName()));
+    checkNotNull(value);
     VAR_INT_CODER.encode(value.scale(), outStream);
     BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
   }
@@ -108,7 +108,7 @@ public class BigDecimalCoder extends 
AtomicCoder<BigDecimal> {
    */
   @Override
   protected long getEncodedElementByteSize(BigDecimal value) throws Exception {
-    checkNotNull(value, String.format("cannot encode a null %s", 
BigDecimal.class.getSimpleName()));
+    checkNotNull(value);
     return VAR_INT_CODER.getEncodedElementByteSize(value.scale())
         + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue());
   }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
index 7016242299a..49eebe13af5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java
@@ -79,12 +79,10 @@ public class AutoValueSchema extends 
GetterBasedSchemaProviderV2 {
       }
       Preconditions.checkState(
           number == i,
-          "Expected field number "
-              + i
-              + " for field + "
-              + type.getName()
-              + " instead got "
-              + number);
+          "Expected field number %s for field %s instead got %s",
+          i,
+          type.getName(),
+          number);
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 14adf2f6603..9d5e2fa9c74 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -81,12 +81,10 @@ public class JavaBeanSchema extends 
GetterBasedSchemaProviderV2 {
         }
         Preconditions.checkState(
             number == i,
-            "Expected field number "
-                + i
-                + " for field: "
-                + type.getName()
-                + " instead got "
-                + number);
+            "Expected field number %s for field %s instead got %s",
+            i,
+            type.getName(),
+            number);
       }
     }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 9a8eef2bf2c..3e0c9c87920 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -103,12 +103,10 @@ public class JavaFieldSchema extends 
GetterBasedSchemaProviderV2 {
       }
       Preconditions.checkState(
           number == i,
-          "Expected field number "
-              + i
-              + " for field + "
-              + type.getName()
-              + " instead got "
-              + number);
+          "Expected field number %s for field %s instead got %s",
+          i,
+          type.getName(),
+          number);
     }
   }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
index fee19810c15..e2c0825e027 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
@@ -169,7 +169,8 @@ public class DoFnOutputReceivers {
       checkStateNotNull(outputCoder, "No output tag for %s ", tag);
       checkState(
           outputCoder instanceof SchemaCoder,
-          "Output with tag " + tag + " must have a schema in order to call 
getRowReceiver");
+          "Output with tag %s must have a schema in order to call 
getRowReceiver",
+          tag);
       return DoFnOutputReceivers.rowReceiver(
           context, builderSupplier, tag, (SchemaCoder<T>) outputCoder);
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
index 467f5ab61a3..01dc8de1cc2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java
@@ -195,19 +195,16 @@ public class InstanceBuilder<T> {
 
       checkState(
           Modifier.isStatic(method.getModifiers()),
-          "Factory method must be a static method for "
-              + factoryClass.getName()
-              + "#"
-              + method.getName());
+          "Factory method %s#%s must be a static method",
+          factoryClass.getName(),
+          method.getName());
 
       checkState(
           type.isAssignableFrom(method.getReturnType()),
-          "Return type for "
-              + factoryClass.getName()
-              + "#"
-              + method.getName()
-              + " must be assignable to "
-              + type.getSimpleName());
+          "Return type for %s#%s must be assignable to %s",
+          factoryClass.getName(),
+          method.getName(),
+          type.getSimpleName());
 
       if (!method.isAccessible()) {
         method.setAccessible(true);
@@ -248,10 +245,9 @@ public class InstanceBuilder<T> {
 
       checkState(
           type.isAssignableFrom(factoryClass),
-          "Instance type "
-              + factoryClass.getName()
-              + " must be assignable to "
-              + type.getSimpleName());
+          "Instance type %s must be assignable to %s",
+          factoryClass.getName(),
+          type.getSimpleName());
 
       if (!constructor.isAccessible()) {
         constructor.setAccessible(true);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
index 063ca9517a2..f97c7fe75d8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java
@@ -109,7 +109,6 @@ public class BigDecimalCoderTest {
   @Test
   public void encodeNullThrowsException() throws Exception {
     thrown.expect(NullPointerException.class);
-    thrown.expectMessage("cannot encode a null BigDecimal");
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 78aceeaab19..052ff324490 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2087,9 +2087,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
       checkState(
           mainOutputSchemaCoder != null,
-          "Output with tag "
-              + mainOutputTag
-              + " must have a schema in order to call getRowReceiver");
+          "Output with tag %s must have a schema in order to call 
getRowReceiver",
+          mainOutputTag);
       return mainRowOutputReceiver;
     }
 
@@ -2130,9 +2129,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             if (tag == null || mainOutputTag.equals(tag)) {
               checkState(
                   mainOutputSchemaCoder != null,
-                  "Output with tag "
-                      + mainOutputTag
-                      + " must have a schema in order to call getRowReceiver");
+                  "Output with tag %s must have a schema in order to call 
getRowReceiver",
+                  mainOutputTag);
               return mainRowOutputReceiver;
             }
 
@@ -2140,7 +2138,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             checkState(outputCoder != null, "No output tag for %s", tag);
             checkState(
                 outputCoder instanceof SchemaCoder,
-                "Output with tag " + tag + " must have a schema in order to 
call getRowReceiver");
+                "Output with tag %s must have a schema in order to call 
getRowReceiver",
+                tag);
             return new OutputReceiver<Row>() {
               private SerializableFunction<Row, T> fromRowFunction =
                   ((SchemaCoder) outputCoder).getFromRowFunction();
@@ -2426,9 +2425,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
       checkState(
           mainOutputSchemaCoder != null,
-          "Output with tag "
-              + mainOutputTag
-              + " must have a schema in order to call getRowReceiver");
+          "Output with tag %s must have a schema in order to call 
getRowReceiver",
+          mainOutputTag);
       return mainRowOutputReceiver;
     }
 
@@ -2466,9 +2464,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             if (tag == null || mainOutputTag.equals(tag)) {
               checkState(
                   mainOutputSchemaCoder != null,
-                  "Output with tag "
-                      + mainOutputTag
-                      + " must have a schema in order to call getRowReceiver");
+                  "Output with tag %s must have a schema in order to call 
getRowReceiver",
+                  mainOutputTag);
               return mainRowOutputReceiver;
             }
 
@@ -2476,7 +2473,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             checkState(outputCoder != null, "No output tag for %s", tag);
             checkState(
                 outputCoder instanceof SchemaCoder,
-                "Output with tag " + tag + " must have a schema in order to 
call getRowReceiver");
+                "Output with tag %s must have a schema in order to call 
getRowReceiver",
+                tag);
             return new OutputReceiver<Row>() {
               private SerializableFunction<Row, T> fromRowFunction =
                   ((SchemaCoder) outputCoder).getFromRowFunction();
@@ -2736,9 +2734,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
       checkState(
           mainOutputSchemaCoder != null,
-          "Output with tag "
-              + mainOutputTag
-              + " must have a schema in order to call getRowReceiver");
+          "Output with tag %s must have a schema in order to call 
getRowReceiver",
+          mainOutputTag);
       return mainRowOutputReceiver;
     }
 
@@ -2776,9 +2773,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             if (tag == null || mainOutputTag.equals(tag)) {
               checkState(
                   mainOutputSchemaCoder != null,
-                  "Output with tag "
-                      + mainOutputTag
-                      + " must have a schema in order to call getRowReceiver");
+                  "Output with tag %s must have a schema in order to call 
getRowReceiver",
+                  mainOutputTag);
               return mainRowOutputReceiver;
             }
 
@@ -2786,7 +2782,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             checkState(outputCoder != null, "No output tag for %s", tag);
             checkState(
                 outputCoder instanceof SchemaCoder,
-                "Output with tag " + tag + " must have a schema in order to 
call getRowReceiver");
+                "Output with tag %s must have a schema in order to call 
getRowReceiver",
+                tag);
             return new OutputReceiver<Row>() {
               private SerializableFunction<Row, T> fromRowFunction =
                   ((SchemaCoder) outputCoder).getFromRowFunction();
diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 1429253d194..28e63aeac58 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -770,27 +770,20 @@ public class CassandraIO {
     public void validate(PipelineOptions pipelineOptions) {
       checkState(
           hosts() != null,
-          "CassandraIO."
-              + getMutationTypeName()
-              + "() requires a list of hosts to be set via withHosts(hosts)");
+          "CassandraIO.%s() requires a list of hosts to be set via 
withHosts(hosts)",
+          getMutationTypeName());
       checkState(
           port() != null,
-          "CassandraIO."
-              + getMutationTypeName()
-              + "() requires a "
-              + "valid port number to be set via withPort(port)");
+          "CassandraIO.%s() requires a valid port number to be set via 
withPort(port)",
+          getMutationTypeName());
       checkState(
           keyspace() != null,
-          "CassandraIO."
-              + getMutationTypeName()
-              + "() requires a keyspace to be set via "
-              + "withKeyspace(keyspace)");
+          "CassandraIO.%s() requires a keyspace to be set via 
withKeyspace(keyspace)",
+          getMutationTypeName());
       checkState(
           entity() != null,
-          "CassandraIO."
-              + getMutationTypeName()
-              + "() requires an entity to be set via "
-              + "withEntity(entity)");
+          "CassandraIO.%s() requires an entity to be set via 
withEntity(entity)",
+          getMutationTypeName());
     }
 
     @Override
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index d940ff8dd7f..a4d20707304 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -227,13 +227,13 @@ public class BeamRowToStorageApiProto {
         TypeName containedTypeName =
             Preconditions.checkNotNull(
                 elementType.getTypeName(),
-                "Null type name found in contained type at " + 
field.getName());
+                "Null type name found in contained type at %s",
+                field.getName());
         Preconditions.checkState(
             !(containedTypeName.isCollectionType() || 
containedTypeName.isMapType()),
-            "Nested container types are not supported by BigQuery. Field "
-                + field.getName()
-                + " contains a type "
-                + containedTypeName.name());
+            "Nested container types are not supported by BigQuery. Field %s 
contains a type %s",
+            field.getName(),
+            containedTypeName.name());
         TableFieldSchema elementFieldSchema =
             fieldDescriptorFromBeamField(Field.of(field.getName(), 
elementType));
         builder = builder.setType(elementFieldSchema.getType());
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 02d14b745fe..d247886cc67 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -2132,10 +2132,8 @@ public class KafkaIO {
                 if (logTopicVerification == null || !logTopicVerification) {
                   checkState(
                       partitionInfoList != null && 
!partitionInfoList.isEmpty(),
-                      "Could not find any partitions info for topic "
-                          + topic
-                          + ". Please check Kafka configuration and make sure "
-                          + "that provided topics exist.");
+                      "Could not find any partitions info for topic %s. Please 
check Kafka configuration and make sure that provided topics exist.",
+                      topic);
                 } else {
                   LOG.warn(
                       "Could not find any partitions info for topic {}. Please 
check Kafka configuration "
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 37fc6378e2d..01bbe61ab9a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -129,11 +129,9 @@ class KafkaUnboundedSource<K, V> extends 
UnboundedSource<KafkaRecord<K, V>, Kafk
               for (Integer p : providedPartitions) {
                 checkState(
                     partitionsForTopic.contains(p),
-                    "Partition "
-                        + p
-                        + " does not exist for topic "
-                        + providedTopic
-                        + ". Please check Kafka configuration.");
+                    "Partition %s does not exist for topic %s. Please check 
Kafka configuration.",
+                    p,
+                    providedTopic);
               }
             } else {
               for (Integer p : providedPartitions) {
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
index 65ece98d618..490faafb22f 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
@@ -193,10 +193,8 @@ class WatchForKafkaTopicPartitions extends 
PTransform<PBegin, PCollection<KafkaS
           List<PartitionInfo> partitionInfoList = 
kafkaConsumer.partitionsFor(topic);
           checkState(
               partitionInfoList != null && !partitionInfoList.isEmpty(),
-              "Could not find any partitions info for topic "
-                  + topic
-                  + ". Please check Kafka configuration and make sure "
-                  + "that provided topics exist.");
+              "Could not find any partitions info for topic %s. Please check 
Kafka configuration and make sure that provided topics exist.",
+              topic);
           for (PartitionInfo partition : partitionInfoList) {
             current.add(new TopicPartition(topic, partition.partition()));
           }

Reply via email to