This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch enhance-rest-sink in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit a31c43e3b69e946cfae08fc60eca0d485e64b020 Author: Dominik Riemer <[email protected]> AuthorDate: Tue Apr 22 13:16:59 2025 +0200 Fix checkstyle issues --- .../jvm/BrokerSinksExtensionModuleExport.java | 5 +- .../jvm/migrations/RestSinkMigrationV1.java | 197 +++++++------ .../brokers/jvm/rest/RestHeaderConfiguration.java | 6 +- .../sinks/brokers/jvm/rest/RestSink.java | 327 +++++++++++---------- 4 files changed, 275 insertions(+), 260 deletions(-) diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java index 86fe680f7c..df4fce7bf2 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/BrokerSinksExtensionModuleExport.java @@ -24,6 +24,7 @@ import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; import org.apache.streampipes.sinks.brokers.jvm.bufferrest.BufferRestPublisherSink; import org.apache.streampipes.sinks.brokers.jvm.jms.JmsPublisherSink; +import org.apache.streampipes.sinks.brokers.jvm.migrations.RestSinkMigrationV1; import org.apache.streampipes.sinks.brokers.jvm.rabbitmq.RabbitMqPublisherSink; import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink; import org.apache.streampipes.sinks.brokers.jvm.websocket.WebsocketServerSink; @@ -50,6 +51,8 @@ public class BrokerSinksExtensionModuleExport implements IExtensionModuleExport @Override public List<IModelMigrator<?, ?>> migrators() { - return Collections.emptyList(); + return List.of( + new RestSinkMigrationV1() + ); } } diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java index cfef1440e9..3a3bd4a140 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/migrations/RestSinkMigrationV1.java @@ -18,8 +18,6 @@ package org.apache.streampipes.sinks.brokers.jvm.migrations; -import java.util.List; - import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; import org.apache.streampipes.extensions.api.migration.IDataSinkMigrator; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; @@ -33,96 +31,109 @@ import org.apache.streampipes.sdk.StaticProperties; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sinks.brokers.jvm.rest.RestSink; +import java.util.List; + public class RestSinkMigrationV1 implements IDataSinkMigrator { - @Override - public ModelMigratorConfig config() { - return new ModelMigratorConfig( - "org.apache.streampipes.sinks.brokers.jvm.rest", - SpServiceTagPrefix.DATA_SINK, - 0, - 1 - ); - } - - @Override - public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element, - IDataSinkParameterExtractor extractor) throws RuntimeException { - // Extract the URL value from the old configuration - String urlValue = extractor.singleValueParameter(RestSink.URL_KEY, String.class); - if (urlValue == null) { - return MigrationResult.failure(element, "URL property not found in old configuration"); - } - - addRetryDelayKey(element); - addIsRetryEnabledKey(element); - addRetryMaxRetriesKey(element); - addHeaderCollectionKeys(element); - - return MigrationResult.success(element); - } - - public void addRetryDelayKey(DataSinkInvocation element) { - var label = Labels.from(RestSink.RETRY_DELAY_MS_KEY, "Retry Delay (ms)", "Duration in ms to wait for request to " + - "retry."); - var staticProperty = new RuntimeResolvableAnyStaticProperty( - label.getInternalId(), - label.getLabel(), - label.getDescription() - ); - - element.getStaticProperties().add(staticProperty); - } - - public void addIsRetryEnabledKey(DataSinkInvocation element) { - var label = Labels.from(RestSink.IS_RETRY_ENABLED_KEY, "Enable Retry", - "If enabled, the request will be retried at an interval defined by the retry delay up to max retries."); - var staticProperty = new SlideToggleStaticProperty( - label.getInternalId(), - label.getLabel(), - label.getDescription(), - false - ); - element.getStaticProperties().add(staticProperty); - } - - public void addRetryMaxRetriesKey(DataSinkInvocation element) { - var label = Labels.from(RestSink.RETRY_MAX_RETRIES_KEY, "Max Retries", - "The maximum number of retries allowed for request to retry."); - var staticProperty = new RuntimeResolvableAnyStaticProperty( - label.getInternalId(), - label.getLabel(), - label.getDescription() - ); - element.getStaticProperties().add(staticProperty); - } - - public void addHeaderCollectionKeys(DataSinkInvocation element) { - StaticPropertyGroup staticPropertyGroup = new StaticPropertyGroup(); - staticPropertyGroup.setLabel("Request Headers"); - staticPropertyGroup.setDescription("Request Headers"); - staticPropertyGroup.setInternalName(RestSink.HEADER_COLLECTION); - staticPropertyGroup.setHorizontalRendering(false); - - var headerKeyLabel = StaticProperties.stringFreeTextProperty( - Labels.from(RestSink.HEADER_KEY, "Header", "Optional custom headers to be included with the REST request.") - ); - headerKeyLabel.setValue(""); - headerKeyLabel.setOptional(true); - - var headerValueLabel = StaticProperties.stringFreeTextProperty( - Labels.from(RestSink.HEADER_VALUE, "Header Value", "Optional custom headers to be included with the REST " + - "request.") - ); - headerValueLabel.setValue(""); - headerValueLabel.setOptional(true); - - staticPropertyGroup.getStaticProperties().addAll( - List.of( - headerKeyLabel, - headerValueLabel - ) - ); - element.getStaticProperties().add(staticPropertyGroup); - } -} \ No newline at end of file + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + "org.apache.streampipes.sinks.brokers.jvm.rest", + SpServiceTagPrefix.DATA_SINK, + 0, + 1 + ); + } + + @Override + public MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element, + IDataSinkParameterExtractor extractor) throws RuntimeException { + // Extract the URL value from the old configuration + String urlValue = extractor.singleValueParameter(RestSink.URL_KEY, String.class); + if (urlValue == null) { + return MigrationResult.failure(element, "URL property not found in old configuration"); + } + + addRetryDelayKey(element); + addIsRetryEnabledKey(element); + addRetryMaxRetriesKey(element); + addHeaderCollectionKeys(element); + + return MigrationResult.success(element); + } + + public void addRetryDelayKey(DataSinkInvocation element) { + var label = Labels.from( + RestSink.RETRY_DELAY_MS_KEY, + "Retry Delay (ms)", "Duration in ms to wait for request to " + + "retry." + ); + var staticProperty = new RuntimeResolvableAnyStaticProperty( + label.getInternalId(), + label.getLabel(), + label.getDescription() + ); + + element.getStaticProperties().add(staticProperty); + } + + public void addIsRetryEnabledKey(DataSinkInvocation element) { + var label = Labels.from(RestSink.IS_RETRY_ENABLED_KEY, "Enable Retry", + "If enabled, the request will be retried at an interval defined by the retry delay up to max retries."); + var staticProperty = new SlideToggleStaticProperty( + label.getInternalId(), + label.getLabel(), + label.getDescription(), + false + ); + element.getStaticProperties().add(staticProperty); + } + + public void addRetryMaxRetriesKey(DataSinkInvocation element) { + var label = Labels.from(RestSink.RETRY_MAX_RETRIES_KEY, "Max Retries", + "The maximum number of retries allowed for request to retry."); + var staticProperty = new RuntimeResolvableAnyStaticProperty( + label.getInternalId(), + label.getLabel(), + label.getDescription() + ); + element.getStaticProperties().add(staticProperty); + } + + public void addHeaderCollectionKeys(DataSinkInvocation element) { + StaticPropertyGroup staticPropertyGroup = new StaticPropertyGroup(); + staticPropertyGroup.setLabel("Request Headers"); + staticPropertyGroup.setDescription("Request Headers"); + staticPropertyGroup.setInternalName(RestSink.HEADER_COLLECTION); + staticPropertyGroup.setHorizontalRendering(false); + + var headerKeyLabel = StaticProperties.stringFreeTextProperty( + Labels.from( + RestSink.HEADER_KEY, + "Header", + "Optional custom headers to be included with the REST request." + ) + ); + headerKeyLabel.setValue(""); + headerKeyLabel.setOptional(true); + + var headerValueLabel = StaticProperties.stringFreeTextProperty( + Labels.from( + RestSink.HEADER_VALUE, + "Header Value", + "Optional custom headers to be included with the REST " + + "request." + ) + ); + headerValueLabel.setValue(""); + headerValueLabel.setOptional(true); + + staticPropertyGroup.getStaticProperties().addAll( + List.of( + headerKeyLabel, + headerValueLabel + ) + ); + element.getStaticProperties().add(staticPropertyGroup); + } +} diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java index b27fec2e83..d3b94893ce 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestHeaderConfiguration.java @@ -19,7 +19,7 @@ package org.apache.streampipes.sinks.brokers.jvm.rest; public record RestHeaderConfiguration( - String headerKey, - String headerValue + String headerKey, + String headerValue ) { -} \ No newline at end of file +} diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java index 02ecda2b97..e1f96eea8f 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/rest/RestSink.java @@ -18,14 +18,6 @@ package org.apache.streampipes.sinks.brokers.jvm.rest; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.http.HttpResponse; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.entity.ContentType; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.dataformat.JsonDataFormatDefinition; import org.apache.streampipes.extensions.api.extractor.IDataSinkParameterExtractor; @@ -47,163 +39,172 @@ import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; + +import org.apache.http.HttpResponse; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.entity.ContentType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + public class RestSink implements IStreamPipesDataSink { - private static final Logger LOG = LoggerFactory.getLogger(RestSink.class); - - public static final String ID = "org.apache.streampipes.sinks.brokers.jvm.rest"; - public static final String URL_KEY = "url-key"; - public static final String HEADER_COLLECTION = "header-collection"; - public static final String HEADER_KEY = "header-key"; - public static final String HEADER_VALUE = "header-value"; - public static final String IS_RETRY_ENABLED_KEY = "is-retry-enabled-key"; - public static final String RETRY_DELAY_MS_KEY = "retry-delay-ms-key"; - public static final String RETRY_MAX_RETRIES_KEY = "retry-max-retries-key"; - private String url; - private JsonDataFormatDefinition jsonDataFormatDefinition; - private List<RestHeaderConfiguration> headerConfigurations = new ArrayList<>(); - private boolean isRetryEnabled; - private int maxRetries; - private int retryDelayMs; - - @Override - public IDataSinkConfiguration declareConfig() { - return DataSinkConfiguration.create( - RestSink::new, - DataSinkBuilder.create(ID, 1) - .category(DataSinkType.FORWARD) - .withAssets(ExtensionAssetType.DOCUMENTATION) - .withLocales(Locales.EN) - .requiredTextParameter( - Labels.withId(URL_KEY), - false, - false - ) - .requiredSlideToggle( - Labels.withId(IS_RETRY_ENABLED_KEY), - false - ) - .requiredStaticProperty(StaticProperties.integerFreeTextProperty( - Labels.withId(RETRY_DELAY_MS_KEY), - 100 - )) - .requiredStaticProperty(StaticProperties.integerFreeTextProperty( - Labels.withId(RETRY_MAX_RETRIES_KEY), - 3 - )) - .requiredStaticProperty( - StaticProperties.collection( - Labels.withId(HEADER_COLLECTION), - false, - StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_KEY)), - StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_VALUE)) - ) - ) - .requiredStream(StreamRequirementsBuilder - .create() - .requiredProperty(EpRequirements.anyProperty()) - .build()) - .build() - ); - } - - private List<RestHeaderConfiguration> getHeaderConfigurations(IDataSinkParameterExtractor extractor) { - List<RestHeaderConfiguration> headers = new ArrayList<>(); - var csp = (CollectionStaticProperty) extractor.getStaticPropertyByName(HEADER_COLLECTION); - - for (StaticProperty member : csp.getMembers()) { - var memberExtractor = getMemberExtractor(member); - - var headerConfiguration = getHeaders(memberExtractor); - - headers.add(headerConfiguration); - } - return headers; - } - - private StaticPropertyExtractor getMemberExtractor(StaticProperty member) { - return StaticPropertyExtractor.from( - ((StaticPropertyGroup) member).getStaticProperties(), - new ArrayList<>() - ); - } - - private RestHeaderConfiguration getHeaders(StaticPropertyExtractor memberExtractor) { - var headerKey = memberExtractor.textParameter(HEADER_KEY); - var headerValue = memberExtractor.textParameter(HEADER_VALUE); - - return new RestHeaderConfiguration(headerKey, headerValue); - } - - @Override - public void onPipelineStarted(IDataSinkParameters parameters, - EventSinkRuntimeContext runtimeContext) throws SpRuntimeException { - jsonDataFormatDefinition = new JsonDataFormatDefinition(); - url = parameters.extractor().singleValueParameter(URL_KEY, String.class); - headerConfigurations = getHeaderConfigurations(parameters.extractor()); - isRetryEnabled = parameters.extractor().slideToggleValue(IS_RETRY_ENABLED_KEY); - maxRetries = isRetryEnabled ? parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) : 0; - retryDelayMs = isRetryEnabled ? parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) : 0; - } - - @Override - public void onEvent(Event event) { - try { - // Set maximum attempts to 1 if not enabled - int maxAttempts = 1; - if (isRetryEnabled) { - maxAttempts = maxRetries; - } - - byte[] json = jsonDataFormatDefinition.fromMap(event.getRaw()); - for (int attempt = 0; attempt < maxAttempts; attempt++) { - try { - Request request = Request.Post(url) - .bodyByteArray(json, ContentType.APPLICATION_JSON) - .connectTimeout(1000) - .socketTimeout(100000); - - for (RestHeaderConfiguration header : headerConfigurations) { - request.addHeader(header.headerKey(), header.headerValue()); - } - Response response = request.execute(); - HttpResponse httpResponse = response.returnResponse(); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - if (statusCode >= 200 && statusCode < 300) { - LOG.info("Successfully sent event to {} with status {}", url, statusCode); - return; - } else if (statusCode >= 500) { - LOG.warn("Server error {} from {}, retrying... (attempt {}/{})", statusCode, url, attempt + 1, maxRetries); - } else { - LOG.error("Received status {} from {}, not retrying", statusCode, url); - return; - } - } catch (IOException e) { - LOG.warn("IO error when sending to {}, retrying... (attempt {}/{}): {}", url, attempt + 1, maxRetries, - e.getMessage()); - } - if (attempt < maxRetries - 1) { - try { - Thread.sleep(retryDelayMs); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.error("Interrupted while waiting to retry"); - return; - } - } else { - LOG.error("Failed to send event to {} after {} attempts", url, maxRetries); - } - } - } catch (SpRuntimeException e) { - LOG.error("Error while serializing event: {} Exception: {}", event.getSourceInfo().getSourceId(), e); - } - } - - @Override - public void onPipelineStopped() throws SpRuntimeException { - this.headerConfigurations = null; - } -} \ No newline at end of file + private static final Logger LOG = LoggerFactory.getLogger(RestSink.class); + + public static final String ID = "org.apache.streampipes.sinks.brokers.jvm.rest"; + public static final String URL_KEY = "url-key"; + public static final String HEADER_COLLECTION = "header-collection"; + public static final String HEADER_KEY = "header-key"; + public static final String HEADER_VALUE = "header-value"; + public static final String IS_RETRY_ENABLED_KEY = "is-retry-enabled-key"; + public static final String RETRY_DELAY_MS_KEY = "retry-delay-ms-key"; + public static final String RETRY_MAX_RETRIES_KEY = "retry-max-retries-key"; + private String url; + private JsonDataFormatDefinition jsonDataFormatDefinition; + private List<RestHeaderConfiguration> headerConfigurations = new ArrayList<>(); + private boolean isRetryEnabled; + private int maxRetries; + private int retryDelayMs; + + @Override + public IDataSinkConfiguration declareConfig() { + return DataSinkConfiguration.create( + RestSink::new, + DataSinkBuilder.create(ID, 1) + .category(DataSinkType.FORWARD) + .withAssets(ExtensionAssetType.DOCUMENTATION) + .withLocales(Locales.EN) + .requiredTextParameter( + Labels.withId(URL_KEY), + false, + false + ) + .requiredSlideToggle( + Labels.withId(IS_RETRY_ENABLED_KEY), + false + ) + .requiredStaticProperty(StaticProperties.integerFreeTextProperty( + Labels.withId(RETRY_DELAY_MS_KEY), + 100 + )) + .requiredStaticProperty(StaticProperties.integerFreeTextProperty( + Labels.withId(RETRY_MAX_RETRIES_KEY), + 3 + )) + .requiredStaticProperty( + StaticProperties.collection( + Labels.withId(HEADER_COLLECTION), + false, + StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_KEY)), + StaticProperties.stringFreeTextProperty(Labels.withId(HEADER_VALUE)) + ) + ) + .requiredStream(StreamRequirementsBuilder + .create() + .requiredProperty(EpRequirements.anyProperty()) + .build()) + .build() + ); + } + + private List<RestHeaderConfiguration> getHeaderConfigurations(IDataSinkParameterExtractor extractor) { + List<RestHeaderConfiguration> headers = new ArrayList<>(); + var csp = (CollectionStaticProperty) extractor.getStaticPropertyByName(HEADER_COLLECTION); + + for (StaticProperty member : csp.getMembers()) { + var memberExtractor = getMemberExtractor(member); + + var headerConfiguration = getHeaders(memberExtractor); + + headers.add(headerConfiguration); + } + return headers; + } + + private StaticPropertyExtractor getMemberExtractor(StaticProperty member) { + return StaticPropertyExtractor.from( + ((StaticPropertyGroup) member).getStaticProperties(), + new ArrayList<>() + ); + } + + private RestHeaderConfiguration getHeaders(StaticPropertyExtractor memberExtractor) { + var headerKey = memberExtractor.textParameter(HEADER_KEY); + var headerValue = memberExtractor.textParameter(HEADER_VALUE); + + return new RestHeaderConfiguration(headerKey, headerValue); + } + + @Override + public void onPipelineStarted(IDataSinkParameters parameters, + EventSinkRuntimeContext runtimeContext) throws SpRuntimeException { + jsonDataFormatDefinition = new JsonDataFormatDefinition(); + url = parameters.extractor().singleValueParameter(URL_KEY, String.class); + headerConfigurations = getHeaderConfigurations(parameters.extractor()); + isRetryEnabled = parameters.extractor().slideToggleValue(IS_RETRY_ENABLED_KEY); + maxRetries = isRetryEnabled ? parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) : 0; + retryDelayMs = isRetryEnabled ? parameters.extractor().singleValueParameter(RETRY_DELAY_MS_KEY, Integer.class) : 0; + } + + @Override + public void onEvent(Event event) { + try { + // Set maximum attempts to 1 if not enabled + int maxAttempts = 1; + if (isRetryEnabled) { + maxAttempts = maxRetries; + } + + byte[] json = jsonDataFormatDefinition.fromMap(event.getRaw()); + for (int attempt = 0; attempt < maxAttempts; attempt++) { + try { + Request request = Request.Post(url) + .bodyByteArray(json, ContentType.APPLICATION_JSON) + .connectTimeout(1000) + .socketTimeout(100000); + + for (RestHeaderConfiguration header : headerConfigurations) { + request.addHeader(header.headerKey(), header.headerValue()); + } + Response response = request.execute(); + HttpResponse httpResponse = response.returnResponse(); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + if (statusCode >= 200 && statusCode < 300) { + LOG.info("Successfully sent event to {} with status {}", url, statusCode); + return; + } else if (statusCode >= 500) { + LOG.warn("Server error {} from {}, retrying... (attempt {}/{})", statusCode, url, attempt + 1, maxRetries); + } else { + LOG.error("Received status {} from {}, not retrying", statusCode, url); + return; + } + } catch (IOException e) { + LOG.warn("IO error when sending to {}, retrying... (attempt {}/{}): {}", url, attempt + 1, maxRetries, + e.getMessage()); + } + if (attempt < maxRetries - 1) { + try { + Thread.sleep(retryDelayMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while waiting to retry"); + return; + } + } else { + LOG.error("Failed to send event to {} after {} attempts", url, maxRetries); + } + } + } catch (SpRuntimeException e) { + LOG.error("Error while serializing event: {} Exception: {}", event.getSourceInfo().getSourceId(), e); + } + } + + @Override + public void onPipelineStopped() throws SpRuntimeException { + this.headerConfigurations = null; + } +}
