Repository: nifi Updated Branches: refs/heads/master bd4f31a4c -> 4249fc943
NIFI-1284 Creating inner class for SiteToSiteClientConfig to fix serialization issue Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4249fc94 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4249fc94 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4249fc94 Branch: refs/heads/master Commit: 4249fc943a5c6487cd2c19c646497bb95893a96e Parents: bd4f31a Author: Bryan Bende <[email protected]> Authored: Tue Dec 15 16:50:17 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Fri Dec 18 13:08:51 2015 -0500 ---------------------------------------------------------------------- nifi-commons/nifi-site-to-site-client/pom.xml | 6 + .../nifi/remote/client/SiteToSiteClient.java | 261 ++++++++++++------- .../client/socket/TestSiteToSiteClient.java | 46 +++- 3 files changed, 208 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4249fc94/nifi-commons/nifi-site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/pom.xml b/nifi-commons/nifi-site-to-site-client/pom.xml index 457f47d..52179cb 100644 --- a/nifi-commons/nifi-site-to-site-client/pom.xml +++ b/nifi-commons/nifi-site-to-site-client/pom.xml @@ -50,5 +50,11 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.24.0</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/4249fc94/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java index 78237b9..1581c42 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -513,104 +513,7 @@ public interface SiteToSiteClient extends Closeable { * but does not create a SiteToSiteClient */ public SiteToSiteClientConfig buildConfig() { - return new SiteToSiteClientConfig() { - private static final long serialVersionUID = 1L; - - @Override - public boolean isUseCompression() { - return Builder.this.isUseCompression(); - } - - @Override - public String getUrl() { - return Builder.this.getUrl(); - } - - @Override - public long getTimeout(final TimeUnit timeUnit) { - return Builder.this.getTimeout(timeUnit); - } - - @Override - public long getIdleConnectionExpiration(final TimeUnit timeUnit) { - return Builder.this.getIdleConnectionExpiration(timeUnit); - } - - @Override - public SSLContext getSslContext() { - return Builder.this.getSslContext(); - } - - @Override - public String getPortName() { - return Builder.this.getPortName(); - } - - @Override - public String getPortIdentifier() { - return Builder.this.getPortIdentifier(); - } - - @Override - public long getPenalizationPeriod(final TimeUnit timeUnit) { - return Builder.this.getPenalizationPeriod(timeUnit); - } - - @Override - public File getPeerPersistenceFile() { - return Builder.this.getPeerPersistenceFile(); - } - - @Override - public EventReporter getEventReporter() { - return Builder.this.getEventReporter(); - } - - @Override - public long getPreferredBatchDuration(final TimeUnit timeUnit) { - return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS); - } - - @Override - public long getPreferredBatchSize() { - return batchSize; - } - - @Override - public int getPreferredBatchCount() { - return batchCount; - } - - @Override - public String getKeystoreFilename() { - return keystoreFilename; - } - - @Override - public String getKeystorePassword() { - return keystorePass; - } - - @Override - public KeystoreType getKeystoreType() { - return keystoreType; - } - - @Override - public String getTruststoreFilename() { - return truststoreFilename; - } - - @Override - public String getTruststorePassword() { - return truststorePass; - } - - @Override - public KeystoreType getTruststoreType() { - return truststoreType; - } - }; + return new StandardSiteToSiteClientConfig(this); } /** @@ -764,8 +667,168 @@ public interface SiteToSiteClient extends Closeable { } - public abstract class SerializableSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable { + class StandardSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable { + private static final long serialVersionUID = 1L; + private final String url; + private final long timeoutNanos; + private final long penalizationNanos; + private final long idleExpirationNanos; + private final SSLContext sslContext; + private final String keystoreFilename; + private final String keystorePass; + private final KeystoreType keystoreType; + private final String truststoreFilename; + private final String truststorePass; + private final KeystoreType truststoreType; + private final EventReporter eventReporter; + private final File peerPersistenceFile; + private final boolean useCompression; + private final String portName; + private final String portIdentifier; + private final int batchCount; + private final long batchSize; + private final long batchNanos; + + // some serialization frameworks require a default constructor + private StandardSiteToSiteClientConfig() { + this.url = null; + this.timeoutNanos = 0; + this.penalizationNanos = 0; + this.idleExpirationNanos = 0; + this.sslContext = null; + this.keystoreFilename = null; + this.keystorePass = null; + this.keystoreType = null; + this.truststoreFilename = null; + this.truststorePass = null; + this.truststoreType = null; + this.eventReporter = null; + this.peerPersistenceFile = null; + this.useCompression = false; + this.portName = null; + this.portIdentifier = null; + this.batchCount = 0; + this.batchSize = 0; + this.batchNanos = 0; + } + + private StandardSiteToSiteClientConfig(final SiteToSiteClient.Builder builder) { + this.url = builder.url; + this.timeoutNanos = builder.timeoutNanos; + this.penalizationNanos = builder.penalizationNanos; + this.idleExpirationNanos = builder.idleExpirationNanos; + this.sslContext = builder.sslContext; + this.keystoreFilename = builder.keystoreFilename; + this.keystorePass = builder.keystorePass; + this.keystoreType = builder.keystoreType; + this.truststoreFilename = builder.truststoreFilename; + this.truststorePass = builder.truststorePass; + this.truststoreType = builder.truststoreType; + this.eventReporter = builder.eventReporter; + this.peerPersistenceFile = builder.peerPersistenceFile; + this.useCompression = builder.useCompression; + this.portName = builder.portName; + this.portIdentifier = builder.portIdentifier; + this.batchCount = builder.batchCount; + this.batchSize = builder.batchSize; + this.batchNanos = builder.batchNanos; + } + + @Override + public boolean isUseCompression() { + return useCompression; + } + + @Override + public String getUrl() { + return url; + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getIdleConnectionExpiration(final TimeUnit timeUnit) { + return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public SSLContext getSslContext() { + return sslContext; + } + + @Override + public String getPortName() { + return portName; + } + + @Override + public String getPortIdentifier() { + return portIdentifier; + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(penalizationNanos, TimeUnit.NANOSECONDS); + } + + @Override + public File getPeerPersistenceFile() { + return peerPersistenceFile; + } + + @Override + public EventReporter getEventReporter() { + return eventReporter; + } + + @Override + public long getPreferredBatchDuration(final TimeUnit timeUnit) { + return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS); + } + + @Override + public long getPreferredBatchSize() { + return batchSize; + } + + @Override + public int getPreferredBatchCount() { + return batchCount; + } + + @Override + public String getKeystoreFilename() { + return keystoreFilename; + } + + @Override + public String getKeystorePassword() { + return keystorePass; + } + + @Override + public KeystoreType getKeystoreType() { + return keystoreType; + } + + @Override + public String getTruststoreFilename() { + return truststoreFilename; + } + + @Override + public String getTruststorePassword() { + return truststorePass; + } + + @Override + public KeystoreType getTruststoreType() { + return truststoreType; + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4249fc94/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index 4938f20..194a167 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -16,22 +16,27 @@ */ package org.apache.nifi.remote.client.socket; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + public class TestSiteToSiteClient { @Test @@ -100,4 +105,33 @@ public class TestSiteToSiteClient { } } + @Test + public void testSerialization() { + final SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("input") + .buildConfig(); + + final Kryo kryo = new Kryo(); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Output output = new Output(out); + + try { + kryo.writeObject(output, clientConfig); + } finally { + output.close(); + } + + final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + final Input input = new Input(in); + + try { + SiteToSiteClientConfig clientConfig2 = kryo.readObject(input, SiteToSiteClient.StandardSiteToSiteClientConfig.class); + Assert.assertEquals(clientConfig.getUrl(), clientConfig2.getUrl()); + } finally { + input.close(); + } + } + }
