Repository: incubator-streams Updated Branches: refs/heads/STREAMS-214 [created] d72d8f7e1
simple file reader/writer Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d72d8f7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d72d8f7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d72d8f7e Branch: refs/heads/STREAMS-214 Commit: d72d8f7e174ed528007473c9179444bd1eb109c6 Parents: bfa9466 Author: sblackmon <[email protected]> Authored: Mon Nov 10 19:06:54 2014 -0600 Committer: sblackmon <[email protected]> Committed: Mon Nov 10 19:06:54 2014 -0600 ---------------------------------------------------------------------- streams-contrib/pom.xml | 1 + streams-contrib/streams-persist-file/README.md | 12 ++ streams-contrib/streams-persist-file/pom.xml | 125 ++++++++++++++ .../apache/streams/file/FileConfigurator.java | 51 ++++++ .../apache/streams/file/FilePersistReader.java | 163 +++++++++++++++++++ .../apache/streams/file/FilePersistWriter.java | 107 ++++++++++++ .../apache/streams/file/FileConfiguration.json | 14 ++ .../apache/streams/file/test/FilePersistIT.java | 112 +++++++++++++ .../streams/file/test/TestFilePersist.java | 60 +++++++ 9 files changed, 645 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index fcec297..772ce49 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -40,6 +40,7 @@ <module>streams-persist-cassandra</module> <module>streams-persist-console</module> <module>streams-persist-elasticsearch</module> + <module>streams-persist-file</module> <module>streams-persist-hbase</module> <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/README.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/README.md b/streams-contrib/streams-persist-file/README.md new file mode 100644 index 0000000..30a0d1c --- /dev/null +++ b/streams-contrib/streams-persist-file/README.md @@ -0,0 +1,12 @@ +streams-persist-file +===================== + +Read to / write from File-backed Queue + +Example reader/writer configuration: + + file { + path = "/tmp/file-queue.txt" + } + +Reader will consume lines from Writer \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/pom.xml b/streams-contrib/streams-persist-file/pom.xml new file mode 100644 index 0000000..3eed49d --- /dev/null +++ b/streams-contrib/streams-persist-file/pom.xml @@ -0,0 +1,125 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>streams-contrib</artifactId> + <groupId>org.apache.streams</groupId> + <version>0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>streams-persist-file</artifactId> + + <properties> + <tape.version>1.2.3</tape.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.squareup</groupId> + <artifactId>tape</artifactId> + <version>${tape.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-runtime-local</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-persist-console</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + </dependency> + </dependencies> + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.file.pojo</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>true</useJodaDates> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java new file mode 100644 index 0000000..7e1bd96 --- /dev/null +++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FileConfigurator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.file; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; +import org.apache.streams.config.StreamsConfigurator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by sblackmon on 12/10/13. + */ +public class FileConfigurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(FileConfigurator.class); + + private final static ObjectMapper mapper = new ObjectMapper(); + + public static FileConfiguration detectConfiguration(Config kafka) { + + FileConfiguration fileConfiguration = null; + + try { + fileConfiguration = mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), FileConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse FileConfiguration"); + } + + return fileConfiguration; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java new file mode 100644 index 0000000..75ee8b6 --- /dev/null +++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistReader.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.file; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Queues; +import com.squareup.tape.QueueFile; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistReader; +import org.apache.streams.core.StreamsResultSet; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class FilePersistReader implements StreamsPersistReader, Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(FilePersistReader.class); + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper = new ObjectMapper(); + + private FileConfiguration config; + + private QueueFile queueFile; + + private boolean isStarted = false; + private boolean isStopped = false; + + private ExecutorService executor = Executors.newSingleThreadExecutor(); + + public FilePersistReader() { + this(FileConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("file"))); + } + + public FilePersistReader(FileConfiguration config) { + this.config = config; + } + + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } + + @Override + public void startStream() { + isStarted = true; + } + + @Override + public StreamsResultSet readCurrent() { + + while (!queueFile.isEmpty()) { + try { + byte[] bytes = queueFile.peek(); + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + BufferedReader buf = new BufferedReader(new InputStreamReader(bais)); + String s = buf.readLine(); + System.out.println(s); + write(new StreamsDatum(s)); + queueFile.remove(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + StreamsResultSet current; + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + persistQueue.clear(); + + return current; + } + + private void write( StreamsDatum entry ) { + persistQueue.offer(entry); + } + + @Override + public StreamsResultSet readNew(BigInteger bigInteger) { + return null; + } + + @Override + public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) { + return null; + } + + @Override + public boolean isRunning() { + return isStarted && !isStopped; + } + + @Override + public void prepare(Object configurationObject) { + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + //Handle exception + } + + try { + queueFile = new QueueFile(new File(config.getFile())); + } catch (IOException e) { + e.printStackTrace(); + } + + Preconditions.checkNotNull(queueFile); + + this.persistQueue = new ConcurrentLinkedQueue<>(); + + } + + @Override + public void cleanUp() { + try { + queueFile.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + queueFile = null; + isStopped = true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java new file mode 100644 index 0000000..bbae9fb --- /dev/null +++ b/streams-contrib/streams-persist-file/src/main/java/org/apache/streams/file/FilePersistWriter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.file; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.squareup.tape.QueueFile; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsPersistWriter; +import org.apache.streams.util.GuidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class FilePersistWriter implements StreamsPersistWriter, Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(FilePersistWriter.class); + + protected volatile Queue<StreamsDatum> persistQueue; + + private ObjectMapper mapper; + + private FileConfiguration config; + + private QueueFile queueFile; + + public FilePersistWriter() { + this(FileConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("file"))); + } + + public FilePersistWriter(FileConfiguration config) { + this.config = config; + } + + @Override + public void write(StreamsDatum entry) { + + String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter"); + + Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false); + Preconditions.checkArgument(entry.getDocument() instanceof String); + Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) == false); + + byte[] item = ((String)entry.getDocument()).getBytes(); + try { + queueFile.add(item); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void prepare(Object configurationObject) { + + mapper = new ObjectMapper(); + + try { + queueFile = new QueueFile(new File(config.getFile())); + } catch (IOException e) { + e.printStackTrace(); + } + + Preconditions.checkNotNull(queueFile); + + this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + + } + + @Override + public void cleanUp() { + try { + queueFile.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + queueFile = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json b/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json new file mode 100644 index 0000000..5604737 --- /dev/null +++ b/streams-contrib/streams-persist-file/src/main/jsonschema/org/apache/streams/file/FileConfiguration.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.file.FileConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "path": { + "type": "string", + "description": "A file path to read/write from", + "default": "/tmp/streams-file-queue.txt" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java new file mode 100644 index 0000000..e7d50bf --- /dev/null +++ b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/FilePersistIT.java @@ -0,0 +1,112 @@ +package org.apache.streams.file.test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import org.apache.streams.console.ConsolePersistReader; +import org.apache.streams.console.ConsolePersistWriter; +import org.apache.streams.core.StreamBuilder; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.apache.streams.file.FileConfiguration; +import org.apache.streams.file.FilePersistReader; +import org.apache.streams.file.FilePersistWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; +import org.powermock.api.mockito.PowerMockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; + +/** + * Created by sblackmon on 10/20/14. + */ +public class FilePersistIT { + + private FileConfiguration testConfiguration; + + ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class); + ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class); + + StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}"); + StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}"); + StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}"); + + @Before + public void prepareTest() { + + testConfiguration = new FileConfiguration(); + //testConfiguration.setFile("./test-queue.txt"); + + File file = new File( testConfiguration.getFile()); + if( file.exists() ) + file.delete(); + + PowerMockito.when(reader.readCurrent()) + .thenReturn( + new StreamsResultSet(Queues.newConcurrentLinkedQueue( + Lists.newArrayList(testDatum1, testDatum2, testDatum3))) + ).thenReturn(null); + } + + @Test + public void testPersistStream() { + + assert(testConfiguration != null); + + Map<String, Object> streamConfig = Maps.newHashMap(); + streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000); + + StreamBuilder builder = new LocalStreamBuilder(1, streamConfig); + + FilePersistWriter fileWriter = new FilePersistWriter(testConfiguration); + FilePersistReader fileReader = new FilePersistReader(testConfiguration); + + builder.newReadCurrentStream("stdin", reader); + builder.addStreamsPersistWriter("writer", fileWriter, 1, "stdin"); + builder.newReadCurrentStream("reader", fileReader); + builder.addStreamsPersistWriter("stdout", writer, 1, "reader"); + + builder.start(); + + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + //Handle exception + } + + builder.stop(); + + Mockito.verify(writer).write(testDatum1); + Mockito.verify(writer).write(testDatum2); + Mockito.verify(writer).write(testDatum3); + + } + + @After + public void shutdownTest() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d72d8f7e/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java new file mode 100644 index 0000000..1ffd90a --- /dev/null +++ b/streams-contrib/streams-persist-file/src/test/java/org/apache/streams/file/test/TestFilePersist.java @@ -0,0 +1,60 @@ +package org.apache.streams.file.test; + +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.file.FileConfiguration; +import org.apache.streams.file.FilePersistReader; +import org.apache.streams.file.FilePersistWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +/** + * Created by sblackmon on 10/20/14. + */ +public class TestFilePersist { + + private FileConfiguration testConfiguration; + + @Test + public void testPersistWriterString() { + + testConfiguration = new FileConfiguration(); + //testConfiguration.setFile("./test-queue.txt"); + + File file = new File( testConfiguration.getFile()); + if( file.exists() ) + file.delete(); + + FilePersistWriter testPersistWriter = new FilePersistWriter(testConfiguration); + testPersistWriter.prepare(testConfiguration); + + String testJsonString = "{\"dummy\":\"true\"}"; + + testPersistWriter.write(new StreamsDatum(testJsonString, "test")); + + testPersistWriter.cleanUp(); + + FilePersistReader testPersistReader = new FilePersistReader(testConfiguration); + try { + testPersistReader.prepare(testConfiguration); + } catch( Throwable e ) { + e.printStackTrace(); + Assert.fail(); + } + + StreamsResultSet testResult = testPersistReader.readCurrent(); + + testPersistReader.cleanUp(); + + assert(testResult.size() == 1); + + } + +}
