This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1284d769892 Fix various issues in SplunkIO  (#28825)
1284d769892 is described below

commit 1284d769892ddbba13be2fca8daa8a387c71c122
Author: Pranav Bhandari <[email protected]>
AuthorDate: Fri Oct 20 16:19:04 2023 -0400

    Fix various issues in SplunkIO  (#28825)
    
    * Fix GZIP compression in HttpEventPublisher.
    
    * Add checks to make sure the provided URL is valid.
    
    * Fix issue with DefaultCoder in AutoValue generated classes.
    
    * Add support for Splunk `fields` metadata.
    
    Also fix Coder issues for SplunkEvent.
    
    * Address comments.
---
 .../org/apache/beam/sdk/coders/CoderProviders.java |   7 +-
 .../org/apache/beam/sdk/coders/DefaultCoder.java   |   8 +
 sdks/java/io/splunk/build.gradle                   |   1 +
 .../beam/sdk/io/splunk/HttpEventPublisher.java     |   9 +
 .../org/apache/beam/sdk/io/splunk/SplunkEvent.java |  21 +-
 .../beam/sdk/io/splunk/SplunkEventCoder.java       | 206 +++++++++++++++++++
 .../beam/sdk/io/splunk/SplunkEventWriter.java      |  65 +++++-
 .../org/apache/beam/sdk/io/splunk/SplunkIO.java    |   1 -
 .../beam/sdk/io/splunk/SplunkEventCoderTest.java   | 228 +++++++++++++++++++++
 .../apache/beam/sdk/io/splunk/SplunkEventTest.java |   5 +
 .../beam/sdk/io/splunk/SplunkEventWriterTest.java  |  63 +++++-
 .../apache/beam/sdk/io/splunk/SplunkIOTest.java    |  12 +-
 12 files changed, 603 insertions(+), 23 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
index 8e47f4f2bc9..e0a3199d0c6 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
@@ -178,7 +178,12 @@ public final class CoderProviders {
     @Override
     public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends 
Coder<?>> componentCoders)
         throws CannotProvideCoderException {
-      if (!this.type.equals(type)) {
+      boolean isTypeEqual = this.type.equals(type);
+      boolean isAutoValueConcrete =
+          type.getRawType().getName().contains("AutoValue_")
+              && this.type.getRawType().isAssignableFrom(type.getRawType());
+
+      if (!isTypeEqual && !isAutoValueConcrete) {
         throw new CannotProvideCoderException(
             String.format(
                 "Unable to provide coder for %s, this factory can only provide 
coders for %s",
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
index 52718fcde2a..782a77cde68 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
@@ -88,6 +88,14 @@ public @interface DefaultCoder {
 
         Class<?> clazz = typeDescriptor.getRawType();
         DefaultCoder defaultAnnotation = 
clazz.getAnnotation(DefaultCoder.class);
+        if (defaultAnnotation == null) {
+          // check if the superclass has DefaultCoder annotation if the class 
is generated using
+          // AutoValue
+          if (clazz.getName().contains("AutoValue_")) {
+            clazz = clazz.getSuperclass();
+            defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
+          }
+        }
         if (defaultAnnotation == null) {
           throw new CannotProvideCoderException(
               String.format("Class %s does not have a @DefaultCoder 
annotation.", clazz.getName()));
diff --git a/sdks/java/io/splunk/build.gradle b/sdks/java/io/splunk/build.gradle
index dd1b15e10dd..41a7a409e89 100644
--- a/sdks/java/io/splunk/build.gradle
+++ b/sdks/java/io/splunk/build.gradle
@@ -37,6 +37,7 @@ dependencies {
     implementation library.java.joda_time
     implementation library.java.slf4j_api
     implementation library.java.vendored_guava_32_1_2_jre
+    implementation library.java.commons_io
     testImplementation library.java.junit
     testImplementation group: 'org.mock-server', name: 
'mockserver-junit-rule', version: '5.10.0'
     testImplementation group: 'org.mock-server', name: 
'mockserver-client-java', version: '5.10.0'
diff --git 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
index 6c5537990bd..f34fcb7c4e0 100644
--- 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
+++ 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
@@ -22,9 +22,11 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import com.google.api.client.http.ByteArrayContent;
 import com.google.api.client.http.GZipEncoding;
 import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
 import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
 import 
com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler.BackOffRequired;
 import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpIOExceptionHandler;
 import com.google.api.client.http.HttpMediaType;
 import com.google.api.client.http.HttpRequest;
 import com.google.api.client.http.HttpRequestFactory;
@@ -139,6 +141,9 @@ abstract class HttpEventPublisher {
     responseHandler.setBackOffRequired(BackOffRequired.ON_SERVER_ERROR);
 
     request.setUnsuccessfulResponseHandler(responseHandler);
+    HttpIOExceptionHandler ioExceptionHandler =
+        new HttpBackOffIOExceptionHandler(getConfiguredBackOff());
+    request.setIOExceptionHandler(ioExceptionHandler);
     setHeaders(request, token());
 
     return request.execute();
@@ -180,6 +185,10 @@ abstract class HttpEventPublisher {
    */
   private void setHeaders(HttpRequest request, String token) {
     request.getHeaders().setAuthorization(String.format(AUTHORIZATION_SCHEME, 
token));
+
+    if (enableGzipHttpCompression()) {
+      request.getHeaders().setContentEncoding("gzip");
+    }
   }
 
   /**
diff --git 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
index 7dd78e1754b..177900a2d09 100644
--- 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
+++ 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.io.splunk;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
+import com.google.gson.JsonObject;
 import com.google.gson.annotations.SerializedName;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.coders.DefaultCoder;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
@@ -39,7 +39,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  *   <li>index
  * </ul>
  */
-@DefaultSchema(AutoValueSchema.class)
+@DefaultCoder(SplunkEventCoder.class)
 @AutoValue
 public abstract class SplunkEvent {
 
@@ -59,6 +59,8 @@ public abstract class SplunkEvent {
 
   public abstract @Nullable String index();
 
+  public abstract @Nullable JsonObject fields();
+
   public abstract @Nullable String event();
 
   /** A builder class for creating a {@link SplunkEvent}. */
@@ -75,6 +77,8 @@ public abstract class SplunkEvent {
 
     abstract Builder setIndex(String index);
 
+    abstract Builder setFields(JsonObject fields);
+
     abstract Builder setEvent(String event);
 
     abstract String event();
@@ -136,6 +140,17 @@ public abstract class SplunkEvent {
       return setIndex(index);
     }
 
+    /**
+     * Assigns fields value to the event metadata.
+     *
+     * @param fields fields value to assign
+     */
+    public Builder withFields(JsonObject fields) {
+      checkNotNull(fields, "withFields(fields) called with null input.");
+
+      return setFields(fields);
+    }
+
     /**
      * Assigns the event payload to be sent to the HEC endpoint.
      *
diff --git 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java
 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java
new file mode 100644
index 00000000000..35d5314ae9e
--- /dev/null
+++ 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.splunk;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.commons.io.IOUtils;
+
+/** A {@link org.apache.beam.sdk.coders.Coder} for {@link SplunkEvent} 
objects. */
+public class SplunkEventCoder extends AtomicCoder<SplunkEvent> {
+
+  private static final SplunkEventCoder SPLUNK_EVENT_CODER = new 
SplunkEventCoder();
+
+  private static final TypeDescriptor<SplunkEvent> TYPE_DESCRIPTOR =
+      new TypeDescriptor<SplunkEvent>() {};
+  private static final StringUtf8Coder STRING_UTF_8_CODER = 
StringUtf8Coder.of();
+  private static final NullableCoder<String> STRING_NULLABLE_CODER =
+      NullableCoder.of(STRING_UTF_8_CODER);
+  private static final NullableCoder<Long> LONG_NULLABLE_CODER =
+      NullableCoder.of(BigEndianLongCoder.of());
+
+  private static final Gson GSON = new Gson();
+
+  // Version markers must be >= 2.
+  private static final int VERSION_3 = 3;
+
+  public static SplunkEventCoder of() {
+    return SPLUNK_EVENT_CODER;
+  }
+
+  public static CoderProvider getCoderProvider() {
+    return CoderProviders.forCoder(TYPE_DESCRIPTOR, SplunkEventCoder.of());
+  }
+
+  @Override
+  @SuppressWarnings("nullness")
+  public void encode(SplunkEvent value, OutputStream out) throws IOException {
+    out.write(VERSION_3);
+
+    LONG_NULLABLE_CODER.encode(value.time(), out);
+    STRING_NULLABLE_CODER.encode(value.host(), out);
+    STRING_NULLABLE_CODER.encode(value.source(), out);
+    STRING_NULLABLE_CODER.encode(value.sourceType(), out);
+    STRING_NULLABLE_CODER.encode(value.index(), out);
+    String fields = value.fields() == null ? null : value.fields().toString();
+    STRING_NULLABLE_CODER.encode(fields, out);
+    STRING_UTF_8_CODER.encode(value.event(), out);
+  }
+
+  @Override
+  public SplunkEvent decode(InputStream in) throws CoderException, IOException 
{
+    SplunkEvent.Builder builder = SplunkEvent.newBuilder();
+
+    int v = in.read();
+
+    // Versions 1 and 2 of this coder had no version marker field, but 1st 
byte in the serialized
+    // data was always 0 or 1 (present/not present indicator for a nullable 
field).
+    // So here we assume if the first byte is >= 2 then it's the version 
marker.
+
+    if (v >= 2) {
+      decodeWithVersion(v, in, builder);
+    } else {
+      // It's impossible to distinguish between V1 and V2 without re-reading 
portions of the input
+      // stream twice (and without the version marker), so we must have a 
ByteArrayInputStream copy,
+      // which is guaranteed to support mark()/reset().
+
+      ByteArrayOutputStream os = new ByteArrayOutputStream();
+      os.write(v);
+      IOUtils.copy(in, os);
+      ByteArrayInputStream streamCopy = new 
ByteArrayInputStream(os.toByteArray());
+
+      decodeVersion1or2(streamCopy, builder);
+    }
+
+    return builder.build();
+  }
+
+  private void decodeWithVersion(int version, InputStream in, 
SplunkEvent.Builder builder)
+      throws IOException {
+
+    decodeCommonFields(in, builder);
+
+    if (version >= VERSION_3) {
+      String fields = STRING_NULLABLE_CODER.decode(in);
+      if (fields != null) {
+        builder.withFields(GSON.fromJson(fields, JsonObject.class));
+      }
+
+      String event = STRING_UTF_8_CODER.decode(in);
+      builder.withEvent(event);
+    }
+  }
+
+  private void decodeVersion1or2(ByteArrayInputStream in, SplunkEvent.Builder 
builder)
+      throws IOException {
+
+    decodeCommonFields(in, builder);
+
+    in.mark(Integer.MAX_VALUE);
+
+    // The following fields may be different between V1 and V2.
+
+    // V1 format: <... common fields...> <event length> <event string>
+    // V2 format: <... common fields...> <fields present indicator byte 0/1>
+    //                <fields length, if present> <fields string> <event 
length> <event string>
+
+    // We try to read this as V2 first. If any exception, fall back to V1.
+
+    // Note: it's impossible to incorrectly parse V1 data with V2 decoder 
(potentially causing
+    // corrupted fields in the message). If we try that and the 1st byte is:
+    //   - 2 or more: decoding fails because V2 expects it to be either 0 or 1 
(present indicator).
+    //   - 1: this means the "event" string length is 1, so we have only 1 
more byte in the stream.
+    //        V2 decoding fails with EOF assuming 1 is the "fields" string 
length and reading
+    //        at least 1 more byte.
+    //   - 0: this means the "event" string is empty, so we have no more bytes 
in the stream.
+    //        V2 decoding fails with EOF assuming 0 is the "fields" string 
length and reading
+    //        the next "event" field.
+
+    JsonObject fields = null;
+    String event;
+
+    try {
+      // Assume V2 first.
+      String fieldsString = STRING_NULLABLE_CODER.decode(in);
+      if (fieldsString != null) {
+        fields = GSON.fromJson(fieldsString, JsonObject.class);
+      }
+      event = STRING_UTF_8_CODER.decode(in);
+    } catch (CoderException e) {
+      // If failed, reset the stream and parse as V1.
+      in.reset();
+      event = STRING_UTF_8_CODER.decode(in);
+    }
+
+    if (fields != null) {
+      builder.withFields(fields);
+    }
+    builder.withEvent(event);
+  }
+
+  private void decodeCommonFields(InputStream in, SplunkEvent.Builder builder) 
throws IOException {
+    Long time = LONG_NULLABLE_CODER.decode(in);
+    if (time != null) {
+      builder.withTime(time);
+    }
+
+    String host = STRING_NULLABLE_CODER.decode(in);
+    if (host != null) {
+      builder.withHost(host);
+    }
+
+    String source = STRING_NULLABLE_CODER.decode(in);
+    if (source != null) {
+      builder.withSource(source);
+    }
+
+    String sourceType = STRING_NULLABLE_CODER.decode(in);
+    if (sourceType != null) {
+      builder.withSourceType(sourceType);
+    }
+
+    String index = STRING_NULLABLE_CODER.decode(in);
+    if (index != null) {
+      builder.withIndex(index);
+    }
+  }
+
+  @Override
+  public TypeDescriptor<SplunkEvent> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    throw new NonDeterministicException(
+        this, "SplunkEvent can hold arbitrary instances, which may be 
non-deterministic.");
+  }
+}
diff --git 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
index 8ec2a064ee0..615d4e932f4 100644
--- 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
+++ 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
@@ -33,8 +33,9 @@ import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
-import java.time.Instant;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import 
org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -53,8 +54,11 @@ import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InetAddresses;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InternetDomainName;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
@@ -70,7 +74,7 @@ import org.slf4j.LoggerFactory;
 })
 abstract class SplunkEventWriter extends DoFn<KV<Integer, SplunkEvent>, 
SplunkWriteError> {
 
-  private static final Integer DEFAULT_BATCH_COUNT = 1;
+  private static final Integer DEFAULT_BATCH_COUNT = 10;
   private static final Boolean DEFAULT_DISABLE_CERTIFICATE_VALIDATION = false;
   private static final Boolean DEFAULT_ENABLE_BATCH_LOGS = true;
   private static final Boolean DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION = true;
@@ -98,6 +102,13 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
   private static final String COUNT_STATE_NAME = "count";
   private static final String TIME_ID_NAME = "expiry";
 
+  private static final Pattern URL_PATTERN = 
Pattern.compile("^http(s?)://([^:]+)(:[0-9]+)?$");
+
+  @VisibleForTesting
+  protected static final String INVALID_URL_FORMAT_MESSAGE =
+      "Invalid url format. Url format should match PROTOCOL://HOST[:PORT], 
where PORT is optional. "
+          + "Supported Protocols are http and https. eg: http://hostname:8088";;
+
   @StateId(BUFFER_STATE_NAME)
   private final StateSpec<BagState<SplunkEvent>> buffer = StateSpecs.bag();
 
@@ -139,6 +150,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
   public void setup() {
 
     checkArgument(url().isAccessible(), "url is required for writing events.");
+    checkArgument(isValidUrlFormat(url().get()), INVALID_URL_FORMAT_MESSAGE);
     checkArgument(token().isAccessible(), "Access token is required for 
writing events.");
 
     // Either user supplied or default batchCount.
@@ -287,7 +299,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
         response = publisher.execute(events);
 
         if (!response.isSuccessStatusCode()) {
-          UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
+          UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() 
- startTime));
           FAILED_WRITES.inc(countState.read());
           int statusCode = response.getStatusCode();
           if (statusCode >= 400 && statusCode < 500) {
@@ -305,7 +317,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
               events, response.getStatusMessage(), response.getStatusCode(), 
receiver);
 
         } else {
-          SUCCESSFUL_WRITE_LATENCY_MS.update(Instant.now().toEpochMilli() - 
startTime);
+          SUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - 
startTime));
           SUCCESS_WRITES.inc(countState.read());
           VALID_REQUESTS.inc();
           SUCCESSFUL_WRITE_BATCH_SIZE.update(countState.read());
@@ -321,7 +333,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
             e.getStatusCode(),
             e.getContent(),
             e.getStatusMessage());
-        UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
+        UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - 
startTime));
         FAILED_WRITES.inc(countState.read());
         int statusCode = e.getStatusCode();
         if (statusCode >= 400 && statusCode < 500) {
@@ -336,7 +348,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
 
       } catch (IOException ioe) {
         LOG.error("Error writing to Splunk: {}", ioe.getMessage());
-        UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
+        UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - 
startTime));
         FAILED_WRITES.inc(countState.read());
         INVALID_REQUESTS.inc();
 
@@ -350,8 +362,21 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
         bufferState.clear();
         countState.clear();
 
-        if (response != null) {
-          response.disconnect();
+        // We've observed cases where errors at this point can cause the 
pipeline to keep retrying
+        // the same events over and over (e.g. from Dataflow Runner's Pub/Sub 
implementation). Since
+        // the events have either been published or wrapped for error 
handling, we can safely
+        // ignore this error, though there may or may not be a leak of some 
type depending on
+        // HttpResponse's implementation. However, any potential leak would 
still happen if we let
+        // the exception fall through, so this isn't considered a major issue.
+        try {
+          if (response != null) {
+            response.ignore();
+          }
+        } catch (IOException e) {
+          LOG.warn(
+              "Error ignoring response from Splunk. Messages should still have 
published, but there"
+                  + " might be a connection leak.",
+              e);
         }
       }
     }
@@ -426,6 +451,26 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
     }
   }
 
+  @VisibleForTesting
+  static boolean isValidUrlFormat(String url) {
+    Matcher matcher = URL_PATTERN.matcher(url);
+    if (matcher.find()) {
+      String host = matcher.group(2);
+      return InetAddresses.isInetAddress(host) || 
InternetDomainName.isValid(host);
+    }
+    return false;
+  }
+
+  /**
+   * Converts Nanoseconds to Milliseconds.
+   *
+   * @param ns time in nanoseconds
+   * @return time in milliseconds
+   */
+  private static long nanosToMillis(long ns) {
+    return Math.round(((double) ns) / 1e6);
+  }
+
   @AutoValue.Builder
   abstract static class Builder {
 
@@ -458,6 +503,9 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
      */
     Builder withUrl(ValueProvider<String> url) {
       checkArgument(url != null, "withURL(url) called with null input.");
+      if (url.isAccessible()) {
+        checkArgument(isValidUrlFormat(url.get()), INVALID_URL_FORMAT_MESSAGE);
+      }
       return setUrl(url);
     }
 
@@ -469,6 +517,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, 
SplunkEvent>, SplunkWr
      */
     Builder withUrl(String url) {
       checkArgument(url != null, "withURL(url) called with null input.");
+      checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE);
       return setUrl(ValueProvider.StaticValueProvider.of(url));
     }
 
diff --git 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
index bd1e716951d..2127cc55752 100644
--- 
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
+++ 
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
@@ -159,7 +159,6 @@ public class SplunkIO {
               .withRootCaCertificatePath(rootCaCertificatePath())
               .withEnableBatchLogs(enableBatchLogs())
               .withEnableGzipHttpCompression(enableGzipHttpCompression());
-      ;
 
       SplunkEventWriter writer = builder.build();
       LOG.info("SplunkEventWriter configured");
diff --git 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java
 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java
new file mode 100644
index 00000000000..8267e406960
--- /dev/null
+++ 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.splunk;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.gson.JsonObject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.junit.Test;
+
+/** Unit tests for {@link SplunkEventCoder} class. */
+public class SplunkEventCoderTest {
+
+  /**
+   * Test whether {@link SplunkEventCoder} is able to encode/decode a {@link 
SplunkEvent} correctly.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEncodeDecode() throws IOException {
+
+    String event = "test-event";
+    String host = "test-host";
+    String index = "test-index";
+    String source = "test-source";
+    String sourceType = "test-source-type";
+    Long time = 123456789L;
+
+    SplunkEvent actualEvent =
+        SplunkEvent.newBuilder()
+            .withEvent(event)
+            .withHost(host)
+            .withIndex(index)
+            .withSource(source)
+            .withSourceType(sourceType)
+            .withTime(time)
+            .build();
+
+    SplunkEventCoder coder = SplunkEventCoder.of();
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      coder.encode(actualEvent, bos);
+      try (ByteArrayInputStream bin = new 
ByteArrayInputStream(bos.toByteArray())) {
+        SplunkEvent decodedEvent = coder.decode(bin);
+        assertEquals(decodedEvent, actualEvent);
+      }
+    }
+  }
+
+  /**
+   * Test whether {@link SplunkEventCoder} is able to encode/decode a {@link 
SplunkEvent} with
+   * metadata 'fields'.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testEncodeDecodeFields() throws IOException {
+
+    String event = "test-event";
+    JsonObject fields = new JsonObject();
+    fields.addProperty("test-key", "test-value");
+
+    SplunkEvent actualEvent = 
SplunkEvent.newBuilder().withEvent(event).withFields(fields).build();
+
+    SplunkEventCoder coder = SplunkEventCoder.of();
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      coder.encode(actualEvent, bos);
+      try (ByteArrayInputStream bin = new 
ByteArrayInputStream(bos.toByteArray())) {
+        SplunkEvent decodedEvent = coder.decode(bin);
+        assertEquals(decodedEvent, actualEvent);
+      }
+    }
+  }
+
+  /**
+   * Tests whether {@link SplunkEventCoder} is able to decode a {@link 
SplunkEvent} encoded using
+   * the older coder version 1 (commit f0ff6cc).
+   */
+  @Test
+  public void testBackwardsCompatibility_canDecodeVersion1() throws 
IOException, DecoderException {
+
+    SplunkEvent expectedEvent =
+        SplunkEvent.newBuilder()
+            .withEvent("e")
+            .withHost("h")
+            .withIndex("i")
+            .withSource("s")
+            .withSourceType("st")
+            .withTime(1234L)
+            .build();
+
+    String hex = "0100000000000004d2010168010173010273740101690165";
+    SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+    assertEquals(expectedEvent, actualEvent);
+  }
+
+  /**
+   * Tests whether {@link SplunkEventCoder} is able to decode a {@link 
SplunkEvent} encoded using
+   * the older coder version 1 (commit f0ff6cc) and having an empty "event" 
field.
+   *
+   * <p>An empty field is encoded as <code>00</code>, which may look like the 
present/not present
+   * marker for the "fields" field in V2.
+   */
+  @Test
+  public void testBackwardsCompatibility_canDecodeVersion1withEmptyEvent()
+      throws IOException, DecoderException {
+
+    SplunkEvent expectedEvent =
+        SplunkEvent.newBuilder()
+            .withEvent("")
+            .withHost("h")
+            .withIndex("i")
+            .withSource("s")
+            .withSourceType("st")
+            .withTime(1234L)
+            .build();
+
+    String hex = "0100000000000004d20101680101730102737401016900";
+    SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+    assertEquals(expectedEvent, actualEvent);
+  }
+
+  /**
+   * Tests whether {@link SplunkEventCoder} is able to decode a {@link 
SplunkEvent} encoded using
+   * the older coder version 1 (commit f0ff6cc) and having the "event" field 
of length 1.
+   *
+   * <p>This is a special case when "event" is of length 1 and the first 
character code is 00. This
+   * is encoded as byte sequence 01 00 by V1 coder, which can be treated as an 
empty "fields" field
+   * by V2 decoder.
+   */
+  @Test
+  public void testBackwardsCompatibility_canDecodeVersion1withEventLength1()
+      throws IOException, DecoderException {
+
+    SplunkEvent expectedEvent =
+        SplunkEvent.newBuilder()
+            .withEvent(new String(new byte[] {0}, StandardCharsets.UTF_8))
+            .withHost("h")
+            .withIndex("i")
+            .withSource("s")
+            .withSourceType("st")
+            .withTime(1234L)
+            .build();
+
+    String hex = "0100000000000004d2010168010173010273740101690100";
+    SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+    assertEquals(expectedEvent, actualEvent);
+  }
+
+  /**
+   * Tests whether {@link SplunkEventCoder} is able to decode a {@link 
SplunkEvent} encoded using
+   * the older coder version 2 (commit 5e53040), without the newly added 
"fields" field.
+   */
+  @Test
+  public void testBackwardsCompatibility_canDecodeVersion2() throws 
IOException, DecoderException {
+
+    SplunkEvent expectedEvent =
+        SplunkEvent.newBuilder()
+            .withEvent("e")
+            .withHost("h")
+            .withIndex("i")
+            .withSource("s")
+            .withSourceType("st")
+            .withTime(1234L)
+            .build();
+
+    String hex = "0100000000000004d201016801017301027374010169000165";
+    SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+    assertEquals(expectedEvent, actualEvent);
+  }
+
+  /**
+   * Tests whether {@link SplunkEventCoder} is able to decode a {@link 
SplunkEvent} encoded using
+   * the older coder version 2 (commit 5e53040), with the newly added "fields" 
field.
+   */
+  @Test
+  public void testBackwardsCompatibility_canDecodeVersion2withFields()
+      throws IOException, DecoderException {
+
+    JsonObject fields = new JsonObject();
+    fields.addProperty("k", "v");
+
+    SplunkEvent expectedEvent =
+        SplunkEvent.newBuilder()
+            .withEvent("e")
+            .withHost("h")
+            .withIndex("i")
+            .withSource("s")
+            .withSourceType("st")
+            .withTime(1234L)
+            .withFields(fields)
+            .build();
+
+    String hex = 
"0100000000000004d20101680101730102737401016901097b226b223a2276227d0165";
+    SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+    assertEquals(expectedEvent, actualEvent);
+  }
+
+  private static InputStream fromHex(String hex) throws DecoderException {
+    byte[] b = Hex.decodeHex(hex);
+    return new ByteArrayInputStream(b);
+  }
+}
diff --git 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
index 29769526d24..749086bac43 100644
--- 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
+++ 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.splunk;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
+import com.google.gson.JsonObject;
 import org.junit.Test;
 
 /** Unit tests for {@link SplunkEvent} class. */
@@ -34,6 +35,8 @@ public class SplunkEventTest {
     String source = "test-source";
     String sourceType = "test-source-type";
     Long time = 123456789L;
+    JsonObject fields = new JsonObject();
+    fields.addProperty("test-key", "test-value");
 
     SplunkEvent actualEvent =
         SplunkEvent.newBuilder()
@@ -43,6 +46,7 @@ public class SplunkEventTest {
             .withSource(source)
             .withSourceType(sourceType)
             .withTime(time)
+            .withFields(fields)
             .create();
 
     assertEquals(
@@ -53,6 +57,7 @@ public class SplunkEventTest {
             .withSource(source)
             .withSourceType(sourceType)
             .withTime(time)
+            .withFields(fields)
             .create(),
         actualEvent);
 
diff --git 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
index 3633844ab6d..f4d8c1a5e13 100644
--- 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
+++ 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.splunk;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -62,6 +63,21 @@ public class SplunkEventWriterTest {
 
   private MockServerClient mockServerClient;
 
+  @Test
+  public void testMissingURLProtocol() {
+    assertFalse(SplunkEventWriter.isValidUrlFormat("test-url"));
+  }
+
+  @Test
+  public void testInvalidURL() {
+    assertFalse(SplunkEventWriter.isValidUrlFormat("http://1.2.3";));
+  }
+
+  @Test
+  public void testValidURL() {
+    assertTrue(SplunkEventWriter.isValidUrlFormat("http://test-url";));
+  }
+
   @Test
   public void eventWriterMissingURL() {
 
@@ -71,13 +87,51 @@ public class SplunkEventWriterTest {
     assertTrue(thrown.getMessage().contains("url needs to be provided"));
   }
 
+  @Test
+  public void eventWriterMissingURLProtocol() {
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> SplunkEventWriter.newBuilder().withUrl("test-url").build());
+
+    
assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE));
+  }
+
+  /** Test building {@link SplunkEventWriter} with an invalid URL. */
+  @Test
+  public void eventWriterInvalidURL() {
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> 
SplunkEventWriter.newBuilder().withUrl("http://1.2.3";).build());
+
+    
assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE));
+  }
+
+  /**
+   * Test building {@link SplunkEventWriter} with the 
'services/collector/event' path appended to
+   * the URL.
+   */
+  @Test
+  public void eventWriterFullEndpoint() {
+    Exception thrown =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                SplunkEventWriter.newBuilder()
+                    .withUrl("http://test-url:8088/services/collector/event";)
+                    .build());
+
+    
assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE));
+  }
+
   @Test
   public void eventWriterMissingToken() {
 
     Exception thrown =
         assertThrows(
             NullPointerException.class,
-            () -> SplunkEventWriter.newBuilder().withUrl("test-url").build());
+            () -> 
SplunkEventWriter.newBuilder().withUrl("http://test-url";).build());
 
     assertTrue(thrown.getMessage().contains("token needs to be provided"));
   }
@@ -86,7 +140,7 @@ public class SplunkEventWriterTest {
   public void eventWriterDefaultBatchCountAndValidation() {
 
     SplunkEventWriter writer =
-        
SplunkEventWriter.newBuilder().withUrl("test-url").withToken("test-token").build();
+        
SplunkEventWriter.newBuilder().withUrl("http://test-url";).withToken("test-token").build();
 
     assertNull(writer.inputBatchCount());
     assertNull(writer.disableCertificateValidation());
@@ -99,7 +153,7 @@ public class SplunkEventWriterTest {
     Boolean certificateValidation = false;
     SplunkEventWriter writer =
         SplunkEventWriter.newBuilder()
-            .withUrl("test-url")
+            .withUrl("http://test-url";)
             .withToken("test-token")
             .withInputBatchCount(StaticValueProvider.of(batchCount))
             
.withDisableCertificateValidation(StaticValueProvider.of(certificateValidation))
@@ -144,7 +198,6 @@ public class SplunkEventWriterTest {
     PCollection<SplunkWriteError> actual =
         pipeline
             .apply("Create Input data", Create.of(testEvents))
-            // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
SplunkEventCoder.of())))
             .apply(
                 "SplunkEventWriter",
                 ParDo.of(
@@ -200,7 +253,6 @@ public class SplunkEventWriterTest {
     PCollection<SplunkWriteError> actual =
         pipeline
             .apply("Create Input data", Create.of(testEvents))
-            // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
SplunkEventCoder.of())))
             .apply(
                 "SplunkEventWriter",
                 ParDo.of(
@@ -246,7 +298,6 @@ public class SplunkEventWriterTest {
     PCollection<SplunkWriteError> actual =
         pipeline
             .apply("Create Input data", Create.of(testEvents))
-            // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
SplunkEventCoder.of())))
             .apply(
                 "SplunkEventWriter",
                 ParDo.of(
diff --git 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
index 32c98513ea2..d2cfd59aace 100644
--- 
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
+++ 
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.splunk;
 
+import com.google.gson.JsonObject;
 import java.util.List;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -63,7 +64,8 @@ public class SplunkIOTest {
     int testPort = mockServerRule.getPort();
     String url = Joiner.on(':').join("http://localhost";, testPort);
     String token = "test-token";
-
+    JsonObject fields = new JsonObject();
+    fields.addProperty("customfield", 1);
     List<SplunkEvent> testEvents =
         ImmutableList.of(
             SplunkEvent.newBuilder()
@@ -73,6 +75,7 @@ public class SplunkIOTest {
                 .withSource("test-source-1")
                 .withSourceType("test-source-type-1")
                 .withTime(12345L)
+                .withFields(fields)
                 .create(),
             SplunkEvent.newBuilder()
                 .withEvent("test-event-2")
@@ -81,11 +84,12 @@ public class SplunkIOTest {
                 .withSource("test-source-2")
                 .withSourceType("test-source-type-2")
                 .withTime(12345L)
+                .withFields(fields)
                 .create());
 
     PCollection<SplunkWriteError> actual =
         pipeline
-            .apply("Create Input data", Create.of(testEvents)) // 
.withCoder(SplunkEventCoder.of()))
+            .apply("Create Input data", Create.of(testEvents))
             .apply(
                 "SplunkIO",
                 SplunkIO.write(url, 
token).withParallelism(1).withBatchCount(testEvents.size()));
@@ -132,7 +136,7 @@ public class SplunkIOTest {
 
     PCollection<SplunkWriteError> actual =
         pipeline
-            .apply("Create Input data", Create.of(testEvents)) // 
.withCoder(SplunkEventCoder.of()))
+            .apply("Create Input data", Create.of(testEvents))
             .apply(
                 "SplunkIO",
                 SplunkIO.write(url, token)
@@ -182,7 +186,7 @@ public class SplunkIOTest {
 
     PCollection<SplunkWriteError> actual =
         pipeline
-            .apply("Create Input data", Create.of(testEvents)) // 
.withCoder(SplunkEventCoder.of()))
+            .apply("Create Input data", Create.of(testEvents))
             .apply(
                 "SplunkIO",
                 SplunkIO.write(url, 
token).withParallelism(testParallelism).withBatchCount(1));


Reply via email to