This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1284d769892 Fix various issues in SplunkIO (#28825)
1284d769892 is described below
commit 1284d769892ddbba13be2fca8daa8a387c71c122
Author: Pranav Bhandari <[email protected]>
AuthorDate: Fri Oct 20 16:19:04 2023 -0400
Fix various issues in SplunkIO (#28825)
* Fix GZIP compression in HttpEventPublisher.
* Add checks to make sure the provided URL is valid.
* Fix issue with DefaultCoder in AutoValue generated classes.
* Add support for Splunk `fields` metadata.
Also fix Coder issues for SplunkEvent.
* Address comments.
---
.../org/apache/beam/sdk/coders/CoderProviders.java | 7 +-
.../org/apache/beam/sdk/coders/DefaultCoder.java | 8 +
sdks/java/io/splunk/build.gradle | 1 +
.../beam/sdk/io/splunk/HttpEventPublisher.java | 9 +
.../org/apache/beam/sdk/io/splunk/SplunkEvent.java | 21 +-
.../beam/sdk/io/splunk/SplunkEventCoder.java | 206 +++++++++++++++++++
.../beam/sdk/io/splunk/SplunkEventWriter.java | 65 +++++-
.../org/apache/beam/sdk/io/splunk/SplunkIO.java | 1 -
.../beam/sdk/io/splunk/SplunkEventCoderTest.java | 228 +++++++++++++++++++++
.../apache/beam/sdk/io/splunk/SplunkEventTest.java | 5 +
.../beam/sdk/io/splunk/SplunkEventWriterTest.java | 63 +++++-
.../apache/beam/sdk/io/splunk/SplunkIOTest.java | 12 +-
12 files changed, 603 insertions(+), 23 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
index 8e47f4f2bc9..e0a3199d0c6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderProviders.java
@@ -178,7 +178,12 @@ public final class CoderProviders {
@Override
public <T> Coder<T> coderFor(TypeDescriptor<T> type, List<? extends
Coder<?>> componentCoders)
throws CannotProvideCoderException {
- if (!this.type.equals(type)) {
+ boolean isTypeEqual = this.type.equals(type);
+ boolean isAutoValueConcrete =
+ type.getRawType().getName().contains("AutoValue_")
+ && this.type.getRawType().isAssignableFrom(type.getRawType());
+
+ if (!isTypeEqual && !isAutoValueConcrete) {
throw new CannotProvideCoderException(
String.format(
"Unable to provide coder for %s, this factory can only provide
coders for %s",
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
index 52718fcde2a..782a77cde68 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DefaultCoder.java
@@ -88,6 +88,14 @@ public @interface DefaultCoder {
Class<?> clazz = typeDescriptor.getRawType();
DefaultCoder defaultAnnotation =
clazz.getAnnotation(DefaultCoder.class);
+ if (defaultAnnotation == null) {
+ // check if the superclass has DefaultCoder annotation if the class
is generated using
+ // AutoValue
+ if (clazz.getName().contains("AutoValue_")) {
+ clazz = clazz.getSuperclass();
+ defaultAnnotation = clazz.getAnnotation(DefaultCoder.class);
+ }
+ }
if (defaultAnnotation == null) {
throw new CannotProvideCoderException(
String.format("Class %s does not have a @DefaultCoder
annotation.", clazz.getName()));
diff --git a/sdks/java/io/splunk/build.gradle b/sdks/java/io/splunk/build.gradle
index dd1b15e10dd..41a7a409e89 100644
--- a/sdks/java/io/splunk/build.gradle
+++ b/sdks/java/io/splunk/build.gradle
@@ -37,6 +37,7 @@ dependencies {
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
+ implementation library.java.commons_io
testImplementation library.java.junit
testImplementation group: 'org.mock-server', name:
'mockserver-junit-rule', version: '5.10.0'
testImplementation group: 'org.mock-server', name:
'mockserver-client-java', version: '5.10.0'
diff --git
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
index 6c5537990bd..f34fcb7c4e0 100644
---
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
+++
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java
@@ -22,9 +22,11 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GZipEncoding;
import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import
com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler.BackOffRequired;
import com.google.api.client.http.HttpContent;
+import com.google.api.client.http.HttpIOExceptionHandler;
import com.google.api.client.http.HttpMediaType;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
@@ -139,6 +141,9 @@ abstract class HttpEventPublisher {
responseHandler.setBackOffRequired(BackOffRequired.ON_SERVER_ERROR);
request.setUnsuccessfulResponseHandler(responseHandler);
+ HttpIOExceptionHandler ioExceptionHandler =
+ new HttpBackOffIOExceptionHandler(getConfiguredBackOff());
+ request.setIOExceptionHandler(ioExceptionHandler);
setHeaders(request, token());
return request.execute();
@@ -180,6 +185,10 @@ abstract class HttpEventPublisher {
*/
private void setHeaders(HttpRequest request, String token) {
request.getHeaders().setAuthorization(String.format(AUTHORIZATION_SCHEME,
token));
+
+ if (enableGzipHttpCompression()) {
+ request.getHeaders().setContentEncoding("gzip");
+ }
}
/**
diff --git
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
index 7dd78e1754b..177900a2d09 100644
---
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
+++
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEvent.java
@@ -20,9 +20,9 @@ package org.apache.beam.sdk.io.splunk;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
+import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.coders.DefaultCoder;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -39,7 +39,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* <li>index
* </ul>
*/
-@DefaultSchema(AutoValueSchema.class)
+@DefaultCoder(SplunkEventCoder.class)
@AutoValue
public abstract class SplunkEvent {
@@ -59,6 +59,8 @@ public abstract class SplunkEvent {
public abstract @Nullable String index();
+ public abstract @Nullable JsonObject fields();
+
public abstract @Nullable String event();
/** A builder class for creating a {@link SplunkEvent}. */
@@ -75,6 +77,8 @@ public abstract class SplunkEvent {
abstract Builder setIndex(String index);
+ abstract Builder setFields(JsonObject fields);
+
abstract Builder setEvent(String event);
abstract String event();
@@ -136,6 +140,17 @@ public abstract class SplunkEvent {
return setIndex(index);
}
+ /**
+ * Assigns fields value to the event metadata.
+ *
+ * @param fields fields value to assign
+ */
+ public Builder withFields(JsonObject fields) {
+ checkNotNull(fields, "withFields(fields) called with null input.");
+
+ return setFields(fields);
+ }
+
/**
* Assigns the event payload to be sent to the HEC endpoint.
*
diff --git
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java
new file mode 100644
index 00000000000..35d5314ae9e
--- /dev/null
+++
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventCoder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.splunk;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.commons.io.IOUtils;
+
+/** A {@link org.apache.beam.sdk.coders.Coder} for {@link SplunkEvent}
objects. */
+public class SplunkEventCoder extends AtomicCoder<SplunkEvent> {
+
+ private static final SplunkEventCoder SPLUNK_EVENT_CODER = new
SplunkEventCoder();
+
+ private static final TypeDescriptor<SplunkEvent> TYPE_DESCRIPTOR =
+ new TypeDescriptor<SplunkEvent>() {};
+ private static final StringUtf8Coder STRING_UTF_8_CODER =
StringUtf8Coder.of();
+ private static final NullableCoder<String> STRING_NULLABLE_CODER =
+ NullableCoder.of(STRING_UTF_8_CODER);
+ private static final NullableCoder<Long> LONG_NULLABLE_CODER =
+ NullableCoder.of(BigEndianLongCoder.of());
+
+ private static final Gson GSON = new Gson();
+
+ // Version markers must be >= 2.
+ private static final int VERSION_3 = 3;
+
+ public static SplunkEventCoder of() {
+ return SPLUNK_EVENT_CODER;
+ }
+
+ public static CoderProvider getCoderProvider() {
+ return CoderProviders.forCoder(TYPE_DESCRIPTOR, SplunkEventCoder.of());
+ }
+
+ @Override
+ @SuppressWarnings("nullness")
+ public void encode(SplunkEvent value, OutputStream out) throws IOException {
+ out.write(VERSION_3);
+
+ LONG_NULLABLE_CODER.encode(value.time(), out);
+ STRING_NULLABLE_CODER.encode(value.host(), out);
+ STRING_NULLABLE_CODER.encode(value.source(), out);
+ STRING_NULLABLE_CODER.encode(value.sourceType(), out);
+ STRING_NULLABLE_CODER.encode(value.index(), out);
+ String fields = value.fields() == null ? null : value.fields().toString();
+ STRING_NULLABLE_CODER.encode(fields, out);
+ STRING_UTF_8_CODER.encode(value.event(), out);
+ }
+
+ @Override
+ public SplunkEvent decode(InputStream in) throws CoderException, IOException
{
+ SplunkEvent.Builder builder = SplunkEvent.newBuilder();
+
+ int v = in.read();
+
+ // Versions 1 and 2 of this coder had no version marker field, but 1st
byte in the serialized
+ // data was always 0 or 1 (present/not present indicator for a nullable
field).
+ // So here we assume if the first byte is >= 2 then it's the version
marker.
+
+ if (v >= 2) {
+ decodeWithVersion(v, in, builder);
+ } else {
+ // It's impossible to distinguish between V1 and V2 without re-reading
portions of the input
+ // stream twice (and without the version marker), so we must have a
ByteArrayInputStream copy,
+ // which is guaranteed to support mark()/reset().
+
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ os.write(v);
+ IOUtils.copy(in, os);
+ ByteArrayInputStream streamCopy = new
ByteArrayInputStream(os.toByteArray());
+
+ decodeVersion1or2(streamCopy, builder);
+ }
+
+ return builder.build();
+ }
+
+ private void decodeWithVersion(int version, InputStream in,
SplunkEvent.Builder builder)
+ throws IOException {
+
+ decodeCommonFields(in, builder);
+
+ if (version >= VERSION_3) {
+ String fields = STRING_NULLABLE_CODER.decode(in);
+ if (fields != null) {
+ builder.withFields(GSON.fromJson(fields, JsonObject.class));
+ }
+
+ String event = STRING_UTF_8_CODER.decode(in);
+ builder.withEvent(event);
+ }
+ }
+
+ private void decodeVersion1or2(ByteArrayInputStream in, SplunkEvent.Builder
builder)
+ throws IOException {
+
+ decodeCommonFields(in, builder);
+
+ in.mark(Integer.MAX_VALUE);
+
+ // The following fields may be different between V1 and V2.
+
+ // V1 format: <... common fields...> <event length> <event string>
+ // V2 format: <... common fields...> <fields present indicator byte 0/1>
+ // <fields length, if present> <fields string> <event
length> <event string>
+
+ // We try to read this as V2 first. If any exception, fall back to V1.
+
+ // Note: it's impossible to incorrectly parse V1 data with V2 decoder
(potentially causing
+ // corrupted fields in the message). If we try that and the 1st byte is:
+ // - 2 or more: decoding fails because V2 expects it to be either 0 or 1
(present indicator).
+ // - 1: this means the "event" string length is 1, so we have only 1
more byte in the stream.
+ // V2 decoding fails with EOF assuming 1 is the "fields" string
length and reading
+ // at least 1 more byte.
+ // - 0: this means the "event" string is empty, so we have no more bytes
in the stream.
+ // V2 decoding fails with EOF assuming 0 is the "fields" string
length and reading
+ // the next "event" field.
+
+ JsonObject fields = null;
+ String event;
+
+ try {
+ // Assume V2 first.
+ String fieldsString = STRING_NULLABLE_CODER.decode(in);
+ if (fieldsString != null) {
+ fields = GSON.fromJson(fieldsString, JsonObject.class);
+ }
+ event = STRING_UTF_8_CODER.decode(in);
+ } catch (CoderException e) {
+ // If failed, reset the stream and parse as V1.
+ in.reset();
+ event = STRING_UTF_8_CODER.decode(in);
+ }
+
+ if (fields != null) {
+ builder.withFields(fields);
+ }
+ builder.withEvent(event);
+ }
+
+ private void decodeCommonFields(InputStream in, SplunkEvent.Builder builder)
throws IOException {
+ Long time = LONG_NULLABLE_CODER.decode(in);
+ if (time != null) {
+ builder.withTime(time);
+ }
+
+ String host = STRING_NULLABLE_CODER.decode(in);
+ if (host != null) {
+ builder.withHost(host);
+ }
+
+ String source = STRING_NULLABLE_CODER.decode(in);
+ if (source != null) {
+ builder.withSource(source);
+ }
+
+ String sourceType = STRING_NULLABLE_CODER.decode(in);
+ if (sourceType != null) {
+ builder.withSourceType(sourceType);
+ }
+
+ String index = STRING_NULLABLE_CODER.decode(in);
+ if (index != null) {
+ builder.withIndex(index);
+ }
+ }
+
+ @Override
+ public TypeDescriptor<SplunkEvent> getEncodedTypeDescriptor() {
+ return TYPE_DESCRIPTOR;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(
+ this, "SplunkEvent can hold arbitrary instances, which may be
non-deterministic.");
+ }
+}
diff --git
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
index 8ec2a064ee0..615d4e932f4 100644
---
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
+++
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java
@@ -33,8 +33,9 @@ import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
-import java.time.Instant;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import
org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
@@ -53,8 +54,11 @@ import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InetAddresses;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InternetDomainName;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
@@ -70,7 +74,7 @@ import org.slf4j.LoggerFactory;
})
abstract class SplunkEventWriter extends DoFn<KV<Integer, SplunkEvent>,
SplunkWriteError> {
- private static final Integer DEFAULT_BATCH_COUNT = 1;
+ private static final Integer DEFAULT_BATCH_COUNT = 10;
private static final Boolean DEFAULT_DISABLE_CERTIFICATE_VALIDATION = false;
private static final Boolean DEFAULT_ENABLE_BATCH_LOGS = true;
private static final Boolean DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION = true;
@@ -98,6 +102,13 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
private static final String COUNT_STATE_NAME = "count";
private static final String TIME_ID_NAME = "expiry";
+ private static final Pattern URL_PATTERN =
Pattern.compile("^http(s?)://([^:]+)(:[0-9]+)?$");
+
+ @VisibleForTesting
+ protected static final String INVALID_URL_FORMAT_MESSAGE =
+ "Invalid url format. Url format should match PROTOCOL://HOST[:PORT],
where PORT is optional. "
+ + "Supported Protocols are http and https. eg: http://hostname:8088";
+
@StateId(BUFFER_STATE_NAME)
private final StateSpec<BagState<SplunkEvent>> buffer = StateSpecs.bag();
@@ -139,6 +150,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
public void setup() {
checkArgument(url().isAccessible(), "url is required for writing events.");
+ checkArgument(isValidUrlFormat(url().get()), INVALID_URL_FORMAT_MESSAGE);
checkArgument(token().isAccessible(), "Access token is required for
writing events.");
// Either user supplied or default batchCount.
@@ -287,7 +299,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
response = publisher.execute(events);
if (!response.isSuccessStatusCode()) {
- UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
+ UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime()
- startTime));
FAILED_WRITES.inc(countState.read());
int statusCode = response.getStatusCode();
if (statusCode >= 400 && statusCode < 500) {
@@ -305,7 +317,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
events, response.getStatusMessage(), response.getStatusCode(),
receiver);
} else {
- SUCCESSFUL_WRITE_LATENCY_MS.update(Instant.now().toEpochMilli() -
startTime);
+ SUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() -
startTime));
SUCCESS_WRITES.inc(countState.read());
VALID_REQUESTS.inc();
SUCCESSFUL_WRITE_BATCH_SIZE.update(countState.read());
@@ -321,7 +333,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
e.getStatusCode(),
e.getContent(),
e.getStatusMessage());
- UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
+ UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() -
startTime));
FAILED_WRITES.inc(countState.read());
int statusCode = e.getStatusCode();
if (statusCode >= 400 && statusCode < 500) {
@@ -336,7 +348,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
} catch (IOException ioe) {
LOG.error("Error writing to Splunk: {}", ioe.getMessage());
- UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
+ UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() -
startTime));
FAILED_WRITES.inc(countState.read());
INVALID_REQUESTS.inc();
@@ -350,8 +362,21 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
bufferState.clear();
countState.clear();
- if (response != null) {
- response.disconnect();
+ // We've observed cases where errors at this point can cause the
pipeline to keep retrying
+ // the same events over and over (e.g. from Dataflow Runner's Pub/Sub
implementation). Since
+ // the events have either been published or wrapped for error
handling, we can safely
+ // ignore this error, though there may or may not be a leak of some
type depending on
+ // HttpResponse's implementation. However, any potential leak would
still happen if we let
+ // the exception fall through, so this isn't considered a major issue.
+ try {
+ if (response != null) {
+ response.ignore();
+ }
+ } catch (IOException e) {
+ LOG.warn(
+ "Error ignoring response from Splunk. Messages should still have
published, but there"
+ + " might be a connection leak.",
+ e);
}
}
}
@@ -426,6 +451,26 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
}
}
+ @VisibleForTesting
+ static boolean isValidUrlFormat(String url) {
+ Matcher matcher = URL_PATTERN.matcher(url);
+ if (matcher.find()) {
+ String host = matcher.group(2);
+ return InetAddresses.isInetAddress(host) ||
InternetDomainName.isValid(host);
+ }
+ return false;
+ }
+
+ /**
+ * Converts Nanoseconds to Milliseconds.
+ *
+ * @param ns time in nanoseconds
+ * @return time in milliseconds
+ */
+ private static long nanosToMillis(long ns) {
+ return Math.round(((double) ns) / 1e6);
+ }
+
@AutoValue.Builder
abstract static class Builder {
@@ -458,6 +503,9 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
*/
Builder withUrl(ValueProvider<String> url) {
checkArgument(url != null, "withURL(url) called with null input.");
+ if (url.isAccessible()) {
+ checkArgument(isValidUrlFormat(url.get()), INVALID_URL_FORMAT_MESSAGE);
+ }
return setUrl(url);
}
@@ -469,6 +517,7 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer,
SplunkEvent>, SplunkWr
*/
Builder withUrl(String url) {
checkArgument(url != null, "withURL(url) called with null input.");
+ checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE);
return setUrl(ValueProvider.StaticValueProvider.of(url));
}
diff --git
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
index bd1e716951d..2127cc55752 100644
---
a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
+++
b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java
@@ -159,7 +159,6 @@ public class SplunkIO {
.withRootCaCertificatePath(rootCaCertificatePath())
.withEnableBatchLogs(enableBatchLogs())
.withEnableGzipHttpCompression(enableGzipHttpCompression());
- ;
SplunkEventWriter writer = builder.build();
LOG.info("SplunkEventWriter configured");
diff --git
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java
new file mode 100644
index 00000000000..8267e406960
--- /dev/null
+++
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventCoderTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.splunk;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.gson.JsonObject;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.junit.Test;
+
+/** Unit tests for {@link SplunkEventCoder} class. */
+public class SplunkEventCoderTest {
+
+ /**
+ * Test whether {@link SplunkEventCoder} is able to encode/decode a {@link
SplunkEvent} correctly.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEncodeDecode() throws IOException {
+
+ String event = "test-event";
+ String host = "test-host";
+ String index = "test-index";
+ String source = "test-source";
+ String sourceType = "test-source-type";
+ Long time = 123456789L;
+
+ SplunkEvent actualEvent =
+ SplunkEvent.newBuilder()
+ .withEvent(event)
+ .withHost(host)
+ .withIndex(index)
+ .withSource(source)
+ .withSourceType(sourceType)
+ .withTime(time)
+ .build();
+
+ SplunkEventCoder coder = SplunkEventCoder.of();
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ coder.encode(actualEvent, bos);
+ try (ByteArrayInputStream bin = new
ByteArrayInputStream(bos.toByteArray())) {
+ SplunkEvent decodedEvent = coder.decode(bin);
+ assertEquals(decodedEvent, actualEvent);
+ }
+ }
+ }
+
+ /**
+ * Test whether {@link SplunkEventCoder} is able to encode/decode a {@link
SplunkEvent} with
+ * metadata 'fields'.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testEncodeDecodeFields() throws IOException {
+
+ String event = "test-event";
+ JsonObject fields = new JsonObject();
+ fields.addProperty("test-key", "test-value");
+
+ SplunkEvent actualEvent =
SplunkEvent.newBuilder().withEvent(event).withFields(fields).build();
+
+ SplunkEventCoder coder = SplunkEventCoder.of();
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ coder.encode(actualEvent, bos);
+ try (ByteArrayInputStream bin = new
ByteArrayInputStream(bos.toByteArray())) {
+ SplunkEvent decodedEvent = coder.decode(bin);
+ assertEquals(decodedEvent, actualEvent);
+ }
+ }
+ }
+
+ /**
+ * Tests whether {@link SplunkEventCoder} is able to decode a {@link
SplunkEvent} encoded using
+ * the older coder version 1 (commit f0ff6cc).
+ */
+ @Test
+ public void testBackwardsCompatibility_canDecodeVersion1() throws
IOException, DecoderException {
+
+ SplunkEvent expectedEvent =
+ SplunkEvent.newBuilder()
+ .withEvent("e")
+ .withHost("h")
+ .withIndex("i")
+ .withSource("s")
+ .withSourceType("st")
+ .withTime(1234L)
+ .build();
+
+ String hex = "0100000000000004d2010168010173010273740101690165";
+ SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+ assertEquals(expectedEvent, actualEvent);
+ }
+
+ /**
+ * Tests whether {@link SplunkEventCoder} is able to decode a {@link
SplunkEvent} encoded using
+ * the older coder version 1 (commit f0ff6cc) and having an empty "event"
field.
+ *
+ * <p>An empty field is encoded as <code>00</code>, which may look like the
present/not present
+ * marker for the "fields" field in V2.
+ */
+ @Test
+ public void testBackwardsCompatibility_canDecodeVersion1withEmptyEvent()
+ throws IOException, DecoderException {
+
+ SplunkEvent expectedEvent =
+ SplunkEvent.newBuilder()
+ .withEvent("")
+ .withHost("h")
+ .withIndex("i")
+ .withSource("s")
+ .withSourceType("st")
+ .withTime(1234L)
+ .build();
+
+ String hex = "0100000000000004d20101680101730102737401016900";
+ SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+ assertEquals(expectedEvent, actualEvent);
+ }
+
+ /**
+ * Tests whether {@link SplunkEventCoder} is able to decode a {@link
SplunkEvent} encoded using
+ * the older coder version 1 (commit f0ff6cc) and having the "event" field
of length 1.
+ *
+ * <p>This is a special case when "event" is of length 1 and the first
character code is 00. This
+ * is encoded as byte sequence 01 00 by V1 coder, which can be treated as an
empty "fields" field
+ * by V2 decoder.
+ */
+ @Test
+ public void testBackwardsCompatibility_canDecodeVersion1withEventLength1()
+ throws IOException, DecoderException {
+
+ SplunkEvent expectedEvent =
+ SplunkEvent.newBuilder()
+ .withEvent(new String(new byte[] {0}, StandardCharsets.UTF_8))
+ .withHost("h")
+ .withIndex("i")
+ .withSource("s")
+ .withSourceType("st")
+ .withTime(1234L)
+ .build();
+
+ String hex = "0100000000000004d2010168010173010273740101690100";
+ SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+ assertEquals(expectedEvent, actualEvent);
+ }
+
+ /**
+ * Tests whether {@link SplunkEventCoder} is able to decode a {@link
SplunkEvent} encoded using
+ * the older coder version 2 (commit 5e53040), without the newly added
"fields" field.
+ */
+ @Test
+ public void testBackwardsCompatibility_canDecodeVersion2() throws
IOException, DecoderException {
+
+ SplunkEvent expectedEvent =
+ SplunkEvent.newBuilder()
+ .withEvent("e")
+ .withHost("h")
+ .withIndex("i")
+ .withSource("s")
+ .withSourceType("st")
+ .withTime(1234L)
+ .build();
+
+ String hex = "0100000000000004d201016801017301027374010169000165";
+ SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+ assertEquals(expectedEvent, actualEvent);
+ }
+
+ /**
+ * Tests whether {@link SplunkEventCoder} is able to decode a {@link
SplunkEvent} encoded using
+ * the older coder version 2 (commit 5e53040), with the newly added "fields"
field.
+ */
+ @Test
+ public void testBackwardsCompatibility_canDecodeVersion2withFields()
+ throws IOException, DecoderException {
+
+ JsonObject fields = new JsonObject();
+ fields.addProperty("k", "v");
+
+ SplunkEvent expectedEvent =
+ SplunkEvent.newBuilder()
+ .withEvent("e")
+ .withHost("h")
+ .withIndex("i")
+ .withSource("s")
+ .withSourceType("st")
+ .withTime(1234L)
+ .withFields(fields)
+ .build();
+
+ String hex =
"0100000000000004d20101680101730102737401016901097b226b223a2276227d0165";
+ SplunkEvent actualEvent = SplunkEventCoder.of().decode(fromHex(hex));
+
+ assertEquals(expectedEvent, actualEvent);
+ }
+
+ private static InputStream fromHex(String hex) throws DecoderException {
+ byte[] b = Hex.decodeHex(hex);
+ return new ByteArrayInputStream(b);
+ }
+}
diff --git
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
index 29769526d24..749086bac43 100644
---
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
+++
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.splunk;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import com.google.gson.JsonObject;
import org.junit.Test;
/** Unit tests for {@link SplunkEvent} class. */
@@ -34,6 +35,8 @@ public class SplunkEventTest {
String source = "test-source";
String sourceType = "test-source-type";
Long time = 123456789L;
+ JsonObject fields = new JsonObject();
+ fields.addProperty("test-key", "test-value");
SplunkEvent actualEvent =
SplunkEvent.newBuilder()
@@ -43,6 +46,7 @@ public class SplunkEventTest {
.withSource(source)
.withSourceType(sourceType)
.withTime(time)
+ .withFields(fields)
.create();
assertEquals(
@@ -53,6 +57,7 @@ public class SplunkEventTest {
.withSource(source)
.withSourceType(sourceType)
.withTime(time)
+ .withFields(fields)
.create(),
actualEvent);
diff --git
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
index 3633844ab6d..f4d8c1a5e13 100644
---
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
+++
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkEventWriterTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.splunk;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -62,6 +63,21 @@ public class SplunkEventWriterTest {
private MockServerClient mockServerClient;
+ @Test
+ public void testMissingURLProtocol() {
+ assertFalse(SplunkEventWriter.isValidUrlFormat("test-url"));
+ }
+
+ @Test
+ public void testInvalidURL() {
+ assertFalse(SplunkEventWriter.isValidUrlFormat("http://1.2.3"));
+ }
+
+ @Test
+ public void testValidURL() {
+ assertTrue(SplunkEventWriter.isValidUrlFormat("http://test-url"));
+ }
+
@Test
public void eventWriterMissingURL() {
@@ -71,13 +87,51 @@ public class SplunkEventWriterTest {
assertTrue(thrown.getMessage().contains("url needs to be provided"));
}
+ @Test
+ public void eventWriterMissingURLProtocol() {
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> SplunkEventWriter.newBuilder().withUrl("test-url").build());
+
+
assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE));
+ }
+
+ /** Test building {@link SplunkEventWriter} with an invalid URL. */
+ @Test
+ public void eventWriterInvalidURL() {
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
SplunkEventWriter.newBuilder().withUrl("http://1.2.3").build());
+
+
assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE));
+ }
+
+ /**
+ * Test building {@link SplunkEventWriter} with the
'services/collector/event' path appended to
+ * the URL.
+ */
+ @Test
+ public void eventWriterFullEndpoint() {
+ Exception thrown =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ SplunkEventWriter.newBuilder()
+ .withUrl("http://test-url:8088/services/collector/event")
+ .build());
+
+
assertTrue(thrown.getMessage().contains(SplunkEventWriter.INVALID_URL_FORMAT_MESSAGE));
+ }
+
@Test
public void eventWriterMissingToken() {
Exception thrown =
assertThrows(
NullPointerException.class,
- () -> SplunkEventWriter.newBuilder().withUrl("test-url").build());
+ () ->
SplunkEventWriter.newBuilder().withUrl("http://test-url").build());
assertTrue(thrown.getMessage().contains("token needs to be provided"));
}
@@ -86,7 +140,7 @@ public class SplunkEventWriterTest {
public void eventWriterDefaultBatchCountAndValidation() {
SplunkEventWriter writer =
-
SplunkEventWriter.newBuilder().withUrl("test-url").withToken("test-token").build();
+
SplunkEventWriter.newBuilder().withUrl("http://test-url").withToken("test-token").build();
assertNull(writer.inputBatchCount());
assertNull(writer.disableCertificateValidation());
@@ -99,7 +153,7 @@ public class SplunkEventWriterTest {
Boolean certificateValidation = false;
SplunkEventWriter writer =
SplunkEventWriter.newBuilder()
- .withUrl("test-url")
+ .withUrl("http://test-url")
.withToken("test-token")
.withInputBatchCount(StaticValueProvider.of(batchCount))
.withDisableCertificateValidation(StaticValueProvider.of(certificateValidation))
@@ -144,7 +198,6 @@ public class SplunkEventWriterTest {
PCollection<SplunkWriteError> actual =
pipeline
.apply("Create Input data", Create.of(testEvents))
- // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
SplunkEventCoder.of())))
.apply(
"SplunkEventWriter",
ParDo.of(
@@ -200,7 +253,6 @@ public class SplunkEventWriterTest {
PCollection<SplunkWriteError> actual =
pipeline
.apply("Create Input data", Create.of(testEvents))
- // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
SplunkEventCoder.of())))
.apply(
"SplunkEventWriter",
ParDo.of(
@@ -246,7 +298,6 @@ public class SplunkEventWriterTest {
PCollection<SplunkWriteError> actual =
pipeline
.apply("Create Input data", Create.of(testEvents))
- // .withCoder(KvCoder.of(BigEndianIntegerCoder.of(),
SplunkEventCoder.of())))
.apply(
"SplunkEventWriter",
ParDo.of(
diff --git
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
index 32c98513ea2..d2cfd59aace 100644
---
a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
+++
b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/SplunkIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.splunk;
+import com.google.gson.JsonObject;
import java.util.List;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -63,7 +64,8 @@ public class SplunkIOTest {
int testPort = mockServerRule.getPort();
String url = Joiner.on(':').join("http://localhost", testPort);
String token = "test-token";
-
+ JsonObject fields = new JsonObject();
+ fields.addProperty("customfield", 1);
List<SplunkEvent> testEvents =
ImmutableList.of(
SplunkEvent.newBuilder()
@@ -73,6 +75,7 @@ public class SplunkIOTest {
.withSource("test-source-1")
.withSourceType("test-source-type-1")
.withTime(12345L)
+ .withFields(fields)
.create(),
SplunkEvent.newBuilder()
.withEvent("test-event-2")
@@ -81,11 +84,12 @@ public class SplunkIOTest {
.withSource("test-source-2")
.withSourceType("test-source-type-2")
.withTime(12345L)
+ .withFields(fields)
.create());
PCollection<SplunkWriteError> actual =
pipeline
- .apply("Create Input data", Create.of(testEvents)) //
.withCoder(SplunkEventCoder.of()))
+ .apply("Create Input data", Create.of(testEvents))
.apply(
"SplunkIO",
SplunkIO.write(url,
token).withParallelism(1).withBatchCount(testEvents.size()));
@@ -132,7 +136,7 @@ public class SplunkIOTest {
PCollection<SplunkWriteError> actual =
pipeline
- .apply("Create Input data", Create.of(testEvents)) //
.withCoder(SplunkEventCoder.of()))
+ .apply("Create Input data", Create.of(testEvents))
.apply(
"SplunkIO",
SplunkIO.write(url, token)
@@ -182,7 +186,7 @@ public class SplunkIOTest {
PCollection<SplunkWriteError> actual =
pipeline
- .apply("Create Input data", Create.of(testEvents)) //
.withCoder(SplunkEventCoder.of()))
+ .apply("Create Input data", Create.of(testEvents))
.apply(
"SplunkIO",
SplunkIO.write(url,
token).withParallelism(testParallelism).withBatchCount(1));