Repository: flink
Updated Branches:
  refs/heads/master b5e2e3637 -> c532638a0


[FLINK-3854] Support Avro key-value rolling sink writer

This closes #1953


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c532638a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c532638a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c532638a

Branch: refs/heads/master
Commit: c532638a010d75f97184314ee86fd070168c6837
Parents: b5e2e36
Author: Igor Berman <[email protected]>
Authored: Sat Apr 30 17:54:52 2016 +0300
Committer: Robert Metzger <[email protected]>
Committed: Thu Jun 9 13:34:06 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/AvroKeyValueSinkWriter.java   | 309 +++++++++++++++++++
 .../connectors/fs/RollingSinkITCase.java        | 147 +++++++++
 2 files changed, 456 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c532638a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
new file mode 100644
index 0000000..a8919c3
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java
@@ -0,0 +1,309 @@
+package org.apache.flink.streaming.connectors.fs;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+* Implementation of AvroKeyValue writer that can be used in Sink.
+* Each entry would be wrapped in GenericRecord with key/value fields(same as 
in m/r lib)
+<pre>
+Usage:
+{@code
+               RollingSink<Tuple2<Long , Long>> sink = new 
RollingSink<Tuple2<Long , Long>>("/tmp/path");
+               sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
+               sink.setPendingSuffix(".avro");
+               Map<String,String> properties = new HashMap<>();
+               Schema longSchema = Schema.create(Type.LONG);
+               String keySchema = longSchema.toString();
+               String valueSchema = longSchema.toString();
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema);
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema);
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
Boolean.toString(true));
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
+               
+               sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
+               sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
+}
+</pre>
+*/
+public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, 
V>>  implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
+       private static final long serialVersionUID = 1L;
+       public static final String CONF_OUTPUT_KEY_SCHEMA = 
"avro.schema.output.key";
+       public static final String CONF_OUTPUT_VALUE_SCHEMA = 
"avro.schema.output.value";
+       public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
+       public static final String CONF_COMPRESS_CODEC = 
FileOutputFormat.COMPRESS_CODEC;
+       public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
+       public static final String CONF_XZ_LEVEL = "avro.xz.level";
+
+       private transient AvroKeyValueWriter<K, V> keyValueWriter;
+
+       private final Map<String, String> properties;
+
+       /**
+        * C'tor for the writer
+        * <p>
+        * You can provide different properties that will be used to configure 
avro key-value writer as simple properties map(see example above)
+        * @param properties
+        */
+       @SuppressWarnings("deprecation")
+       public AvroKeyValueSinkWriter(Map<String, String> properties) {
+               this.properties = properties;
+               
+               String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
+               if (keySchemaString == null) {
+                       throw new IllegalStateException("No key schema 
provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
+               }
+               Schema.parse(keySchemaString);//verifying that schema valid
+               
+               String valueSchemaString = 
properties.get(CONF_OUTPUT_VALUE_SCHEMA);
+               if (valueSchemaString == null) {
+                       throw new IllegalStateException("No value schema 
provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
+               }
+               Schema.parse(valueSchemaString);//verifying that schema valid
+       }
+
+       private boolean getBoolean(Map<String,String> conf, String key, boolean 
def) {
+               String value = conf.get(key);
+               if (value == null) {
+                       return def;
+               }
+               return Boolean.parseBoolean(value);
+       }
+       
+       private int getInt(Map<String,String> conf, String key, int def) {
+               String value = conf.get(key);
+               if (value == null) {
+                       return def;
+               }
+               return Integer.parseInt(value);
+       }
+
+       //this derived from AvroOutputFormatBase.getCompressionCodec(..)
+       private CodecFactory getCompressionCodec(Map<String,String> conf) {
+               if (getBoolean(conf, CONF_COMPRESS, false)) {
+                       int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, 
CodecFactory.DEFAULT_DEFLATE_LEVEL);
+                       int xzLevel = getInt(conf, CONF_XZ_LEVEL, 
CodecFactory.DEFAULT_XZ_LEVEL);
+
+                       String outputCodec = conf.get(CONF_COMPRESS_CODEC);
+
+                       if 
(DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
+                               return CodecFactory.deflateCodec(deflateLevel);
+                       } else if 
(DataFileConstants.XZ_CODEC.equals(outputCodec)) {
+                               return CodecFactory.xzCodec(xzLevel);
+                       } else {
+                               return CodecFactory.fromString(outputCodec);
+                       }
+               }
+               return CodecFactory.nullCodec();
+       }
+
+       @Override
+       @SuppressWarnings("deprecation")
+       public void open(FileSystem fs, Path path) throws IOException {
+               super.open(fs, path);
+
+               CodecFactory compressionCodec = getCompressionCodec(properties);
+               Schema keySchema = 
Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
+               Schema valueSchema = 
Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
+               keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, 
valueSchema, compressionCodec, getStream());
+       }
+
+       @Override
+       public void close() throws IOException {
+               super.close();//the order is important since super.close 
flushes inside
+               if (keyValueWriter != null) {
+                       keyValueWriter.close();
+               }
+       }
+       
+       @Override
+       public long flush() throws IOException {
+               if (keyValueWriter != null) {
+                       keyValueWriter.sync();
+               }
+               return super.flush();
+       }
+
+       @Override
+       public void write(Tuple2<K, V> element) throws IOException {
+               getStream(); // Throws if the stream is not open
+               keyValueWriter.write(element.f0, element.f1);
+       }
+
+       @Override
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               if (!type.isTupleType()) {
+                       throw new IllegalArgumentException("Input 
TypeInformation is not a tuple type.");
+               }
+
+               TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
+
+               if (tupleType.getArity() != 2) {
+                       throw new IllegalArgumentException("Input 
TypeInformation must be a Tuple2 type.");
+               }
+       }
+
+       @Override
+       public Writer<Tuple2<K, V>> duplicate() {
+               return new AvroKeyValueSinkWriter<K, V>(properties);
+       }
+       
+       // taken from m/r avro lib to remove dependency on it
+       private static final class AvroKeyValueWriter<K, V> {
+               /** A writer for the Avro container file. */
+               private final DataFileWriter<GenericRecord> mAvroFileWriter;
+
+               /**
+                * The writer schema for the generic record entries of the Avro
+                * container file.
+                */
+               private final Schema mKeyValuePairSchema;
+
+               /**
+                * A reusable Avro generic record for writing key/value pairs 
to the
+                * file.
+                */
+               private final AvroKeyValue<Object, Object> mOutputRecord;
+
+               AvroKeyValueWriter(Schema keySchema, Schema valueSchema,
+                               CodecFactory compressionCodec, OutputStream 
outputStream,
+                               int syncInterval) throws IOException {
+                       // Create the generic record schema for the key/value 
pair.
+                       mKeyValuePairSchema = AvroKeyValue
+                                       .getSchema(keySchema, valueSchema);
+
+                       // Create an Avro container file and a writer to it.
+                       DatumWriter<GenericRecord> genericDatumWriter = new 
GenericDatumWriter<GenericRecord>(
+                                       mKeyValuePairSchema);
+                       mAvroFileWriter = new DataFileWriter<GenericRecord>(
+                                       genericDatumWriter);
+                       mAvroFileWriter.setCodec(compressionCodec);
+                       mAvroFileWriter.setSyncInterval(syncInterval);
+                       mAvroFileWriter.create(mKeyValuePairSchema, 
outputStream);
+
+                       // Create a reusable output record.
+                       mOutputRecord = new AvroKeyValue<Object, Object>(
+                                       new 
GenericData.Record(mKeyValuePairSchema));
+               }
+
+               AvroKeyValueWriter(Schema keySchema, Schema valueSchema,
+                               CodecFactory compressionCodec, OutputStream 
outputStream)
+                               throws IOException {
+                       this(keySchema, valueSchema, compressionCodec, 
outputStream,
+                                       
DataFileConstants.DEFAULT_SYNC_INTERVAL);
+               }
+
+               void write(K key, V value) throws IOException {
+                       mOutputRecord.setKey(key);
+                       mOutputRecord.setValue(value);
+                       mAvroFileWriter.append(mOutputRecord.get());
+               }
+
+               void close() throws IOException {
+                       mAvroFileWriter.close();
+               }
+
+               long sync() throws IOException {
+                       return mAvroFileWriter.sync();
+               }
+       }
+
+       // taken from AvroKeyValue avro-mapr lib
+       public static class AvroKeyValue<K, V> {
+               /** The name of the key value pair generic record. */
+               public static final String KEY_VALUE_PAIR_RECORD_NAME = 
"KeyValuePair";
+
+               /** The namespace of the key value pair generic record. */
+               public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = 
"org.apache.avro.mapreduce";
+
+               /** The name of the generic record field containing the key. */
+               public static final String KEY_FIELD = "key";
+
+               /** The name of the generic record field containing the value. 
*/
+               public static final String VALUE_FIELD = "value";
+
+               /** The key/value generic record wrapped by this class. */
+               public final GenericRecord mKeyValueRecord;
+
+               /**
+                * Wraps a GenericRecord that is a key value pair.
+                */
+               public AvroKeyValue(GenericRecord keyValueRecord) {
+                       mKeyValueRecord = keyValueRecord;
+               }
+
+               public GenericRecord get() {
+                       return mKeyValueRecord;
+               }
+
+               public void setKey(K key) {
+                       mKeyValueRecord.put(KEY_FIELD, key);
+               }
+
+               public void setValue(V value) {
+                       mKeyValueRecord.put(VALUE_FIELD, value);
+               }
+
+               @SuppressWarnings("unchecked")
+               public K getKey() {
+                       return (K) mKeyValueRecord.get(KEY_FIELD);
+               }
+
+               @SuppressWarnings("unchecked")
+               public V getValue() {
+                       return (V) mKeyValueRecord.get(VALUE_FIELD);
+               }
+
+               /**
+                * Creates a KeyValuePair generic record schema.
+                * 
+                * @return A schema for a generic record with two fields: 'key' 
and
+                *         'value'.
+                */
+               public static Schema getSchema(Schema keySchema, Schema 
valueSchema) {
+                       Schema schema = 
Schema.createRecord(KEY_VALUE_PAIR_RECORD_NAME,
+                                       "A key/value pair", 
KEY_VALUE_PAIR_RECORD_NAMESPACE, false);
+                       schema.setFields(Arrays.asList(new 
Schema.Field(KEY_FIELD,
+                                       keySchema, "The key", null), new 
Schema.Field(VALUE_FIELD,
+                                       valueSchema, "The value", null)));
+                       return schema;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c532638a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 9770f41..b6c5bdb 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -17,6 +17,16 @@
 */
 package org.apache.flink.streaming.connectors.fs;
 
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
@@ -25,6 +35,7 @@ import org.apache.flink.runtime.taskmanager.MultiShotLatch;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.NetUtils;
@@ -48,6 +59,8 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Tests for {@link RollingSink}. These
@@ -296,6 +309,140 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
                reader.close();
                inStream.close();
        }
+       
+       
+       /**
+        * This tests {@link AvroKeyValueSinkWriter}
+        * with non-rolling output and without compression.
+        */
+       @Test
+       public void testNonRollingAvroKeyValueWithoutCompressionWriter() throws 
Exception {
+               final int NUM_ELEMENTS = 20;
+               final int PARALLELISM = 2;
+               final String outPath = hdfsURI + 
"/avro-kv-no-comp-non-rolling-out";
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(PARALLELISM);
+
+               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
+                               .broadcast()
+                               .filter(new OddEvenFilter());
+
+
+               Map<String, String> properties = new HashMap<>();
+               Schema keySchema = Schema.create(Type.INT);
+               Schema valueSchema = Schema.create(Type.STRING);
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
+               RollingSink<Tuple2<Integer, String>> sink = new 
RollingSink<Tuple2<Integer, String>>(outPath)
+                               .setWriter(new AvroKeyValueSinkWriter<Integer, 
String>(properties))
+                               .setBucketer(new NonRollingBucketer())
+                               .setPartPrefix("part")
+                               .setPendingPrefix("")
+                               .setPendingSuffix("");
+
+               source.addSink(sink);
+
+               env.execute("RollingSink Avro KeyValue Writer Test");
+
+               GenericData.setStringType(valueSchema, StringType.String);
+               Schema elementSchema = AvroKeyValue.getSchema(keySchema, 
valueSchema);
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<GenericRecord>(elementSchema);
+               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<GenericRecord>(inStream, elementReader);
+               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
+                       int key = wrappedEntry.getKey().intValue();
+                       Assert.assertEquals(i, key);
+                       String value = wrappedEntry.getValue();
+                       Assert.assertEquals("message #" + i, value);
+               }
+
+               dataFileStream.close();
+               inStream.close();
+
+               inStream = dfs.open(new Path(outPath + "/part-1-0"));
+               dataFileStream = new DataFileStream<GenericRecord>(inStream, 
elementReader);
+
+               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
+                       int key = wrappedEntry.getKey().intValue();
+                       Assert.assertEquals(i, key);
+                       String value = wrappedEntry.getValue();
+                       Assert.assertEquals("message #" + i, value);
+               }
+
+               dataFileStream.close();
+               inStream.close();
+       }
+       
+       /**
+        * This tests {@link AvroKeyValueSinkWriter}
+        * with non-rolling output and with compression.
+        */
+       @Test
+       public void testNonRollingAvroKeyValueWithCompressionWriter() throws 
Exception {
+               final int NUM_ELEMENTS = 20;
+               final int PARALLELISM = 2;
+               final String outPath = hdfsURI + 
"/avro-kv-no-comp-non-rolling-out";
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(PARALLELISM);
+
+               DataStream<Tuple2<Integer, String>> source = env.addSource(new 
TestSourceFunction(NUM_ELEMENTS))
+                               .broadcast()
+                               .filter(new OddEvenFilter());
+
+
+               Map<String, String> properties = new HashMap<>();
+               Schema keySchema = Schema.create(Type.INT);
+               Schema valueSchema = Schema.create(Type.STRING);
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, 
keySchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, 
valueSchema.toString());
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, 
String.valueOf(true));
+               properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, 
DataFileConstants.SNAPPY_CODEC);
+               RollingSink<Tuple2<Integer, String>> sink = new 
RollingSink<Tuple2<Integer, String>>(outPath)
+                               .setWriter(new AvroKeyValueSinkWriter<Integer, 
String>(properties))
+                               .setBucketer(new NonRollingBucketer())
+                               .setPartPrefix("part")
+                               .setPendingPrefix("")
+                               .setPendingSuffix("");
+
+               source.addSink(sink);
+
+               env.execute("RollingSink Avro KeyValue Writer Test");
+
+               GenericData.setStringType(valueSchema, StringType.String);
+               Schema elementSchema = AvroKeyValue.getSchema(keySchema, 
valueSchema);
+
+               FSDataInputStream inStream = dfs.open(new Path(outPath + 
"/part-0-0"));
+               SpecificDatumReader<GenericRecord> elementReader = new 
SpecificDatumReader<GenericRecord>(elementSchema);
+               DataFileStream<GenericRecord> dataFileStream = new 
DataFileStream<GenericRecord>(inStream, elementReader);
+               for (int i = 0; i < NUM_ELEMENTS; i += 2) {
+                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
+                       int key = wrappedEntry.getKey().intValue();
+                       Assert.assertEquals(i, key);
+                       String value = wrappedEntry.getValue();
+                       Assert.assertEquals("message #" + i, value);
+               }
+
+               dataFileStream.close();
+               inStream.close();
+
+               inStream = dfs.open(new Path(outPath + "/part-1-0"));
+               dataFileStream = new DataFileStream<GenericRecord>(inStream, 
elementReader);
+
+               for (int i = 1; i < NUM_ELEMENTS; i += 2) {
+                       AvroKeyValue<Integer, String> wrappedEntry = new 
AvroKeyValue<Integer, String>(dataFileStream.next());
+                       int key = wrappedEntry.getKey().intValue();
+                       Assert.assertEquals(i, key);
+                       String value = wrappedEntry.getValue();
+                       Assert.assertEquals("message #" + i, value);
+               }
+
+               dataFileStream.close();
+               inStream.close();
+       }
+       
 
        // we use this to synchronize the clock changes to elements being 
processed
        final static MultiShotLatch latch1 = new MultiShotLatch();

Reply via email to