Repository: beam Updated Branches: refs/heads/release-2.0.0 5763c384d -> acb3f6a9c
http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java index d120f72..5df2bcf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java @@ -34,12 +34,23 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> { } @Override + public void encode(PubsubMessage value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { PAYLOAD_CODER.encode(value.getPayload(), outStream, context); } @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public PubsubMessage decode(InputStream inStream, Context context) throws IOException { return new PubsubMessage( PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of()); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java index e061edc..bcf7656 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java @@ -45,15 +45,26 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> return new PubsubMessageWithAttributesCoder(); } + @Override + public void encode(PubsubMessage value, OutputStream outStream) + throws IOException { + encode(value, outStream, Context.NESTED); + } + public void encode(PubsubMessage value, OutputStream outStream, Context context) throws IOException { - PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested()); + PAYLOAD_CODER.encode(value.getPayload(), outStream); ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context); } @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + return decode(inStream, Context.NESTED); + } + + @Override public PubsubMessage decode(InputStream inStream, Context context) throws IOException { - byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested()); + byte[] payload = PAYLOAD_CODER.decode(inStream); Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context); return new PubsubMessage(payload, attributes); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 9f04a6c..ad38e28 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -108,21 +108,21 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @Override public void encode( - OutgoingMessage value, OutputStream outStream, Context context) + OutgoingMessage value, OutputStream outStream) throws CoderException, IOException { - ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested()); - ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested()); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested()); - RECORD_ID_CODER.encode(value.recordId, outStream, context.nested()); + ByteArrayCoder.of().encode(value.elementBytes, outStream); + ATTRIBUTES_CODER.encode(value.attributes, outStream); + BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream); + RECORD_ID_CODER.encode(value.recordId, outStream); } @Override public OutgoingMessage decode( - InputStream inStream, Context context) throws CoderException, IOException { - byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested()); - Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested()); - long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested()); - @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested()); + InputStream inStream) throws CoderException, IOException { + byte[] elementBytes = ByteArrayCoder.of().decode(inStream); + Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream); + long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream); + @Nullable String recordId = RECORD_ID_CODER.decode(inStream); return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index c16b8fb..e8fe701 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -369,19 +369,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub private PubsubCheckpointCoder() {} @Override - public void encode(PubsubCheckpoint value, OutputStream outStream, Context context) + public void encode(PubsubCheckpoint value, OutputStream outStream) throws IOException { SUBSCRIPTION_PATH_CODER.encode( value.subscriptionPath, - outStream, - context.nested()); - LIST_CODER.encode(value.notYetReadIds, outStream, context); + outStream); + LIST_CODER.encode(value.notYetReadIds, outStream); } @Override - public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException { - String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested()); - List<String> notYetReadIds = LIST_CODER.decode(inStream, context); + public PubsubCheckpoint decode(InputStream inStream) throws IOException { + String path = SUBSCRIPTION_PATH_CODER.decode(inStream); + List<String> notYetReadIds = LIST_CODER.decode(inStream); return new PubsubCheckpoint(path, null, null, notYetReadIds); } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d60c721..70d5377 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -748,12 +748,23 @@ public class BigQueryIOTest implements Serializable { */ private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> { @Override + public void encode(PartitionedGlobalWindow window, OutputStream outStream) + throws IOException, CoderException { + encode(window, outStream, Context.NESTED); + } + + @Override public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context) throws IOException, CoderException { StringUtf8Coder.of().encode(window.value, outStream, context); } @Override + public PartitionedGlobalWindow decode(InputStream inStream) throws IOException, CoderException { + return decode(inStream, Context.NESTED); + } + + @Override public PartitionedGlobalWindow decode(InputStream inStream, Context context) throws IOException, CoderException { return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context)); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java index 8fddfe0..8d2598a 100644 --- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java @@ -68,13 +68,13 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { + public void encode(T value, OutputStream outStream) throws IOException { value.write(new DataOutputStream(outStream)); } @SuppressWarnings("unchecked") @Override - public T decode(InputStream inStream, Context context) throws IOException { + public T decode(InputStream inStream) throws IOException { try { if (type == NullWritable.class) { // NullWritable has no default constructor http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java index 7cc043c..501fe09 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java @@ -44,16 +44,14 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable { } @Override - public void encode(Mutation mutation, OutputStream outStream, - Coder.Context context) throws IOException { + public void encode(Mutation mutation, OutputStream outStream) throws IOException { MutationType type = getType(mutation); MutationProto proto = ProtobufUtil.toMutation(type, mutation); proto.writeDelimitedTo(outStream); } @Override - public Mutation decode(InputStream inStream, - Coder.Context context) throws IOException { + public Mutation decode(InputStream inStream) throws IOException { return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java index 24a5f7f..1d06635 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java @@ -41,13 +41,13 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable { } @Override - public void encode(Result value, OutputStream outputStream, Coder.Context context) + public void encode(Result value, OutputStream outputStream) throws IOException { ProtobufUtil.toResult(value).writeDelimitedTo(outputStream); } @Override - public Result decode(InputStream inputStream, Coder.Context context) + public Result decode(InputStream inputStream) throws IOException { return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream)); } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ba84c2a..e21945f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1597,13 +1597,13 @@ public class KafkaIO { private static class NullOnlyCoder<T> extends AtomicCoder<T> { @Override - public void encode(T value, OutputStream outStream, Context context) { + public void encode(T value, OutputStream outStream) { checkArgument(value == null, "Can only encode nulls"); // Encode as no bytes. } @Override - public T decode(InputStream inStream, Context context) { + public T decode(InputStream inStream) { return null; } } http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index d838a0d..1971060 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -50,6 +50,12 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> { } @Override + public void encode(KafkaRecord<K, V> value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context) throws CoderException, IOException { Context nested = context.nested(); @@ -61,6 +67,11 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> { } @Override + public KafkaRecord<K, V> decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public KafkaRecord<K, V> decode(InputStream inStream, Context context) throws CoderException, IOException { Context nested = context.nested(); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java index 77fe127..f233e27 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java @@ -43,30 +43,28 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> { } @Override - public void encode(KinesisRecord value, OutputStream outStream, Context context) throws + public void encode(KinesisRecord value, OutputStream outStream) throws IOException { - Context nested = context.nested(); - BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested); - STRING_CODER.encode(value.getSequenceNumber(), outStream, nested); - STRING_CODER.encode(value.getPartitionKey(), outStream, nested); - INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested); - VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested); - INSTANT_CODER.encode(value.getReadTime(), outStream, nested); - STRING_CODER.encode(value.getStreamName(), outStream, nested); - STRING_CODER.encode(value.getShardId(), outStream, context); + BYTE_ARRAY_CODER.encode(value.getData().array(), outStream); + STRING_CODER.encode(value.getSequenceNumber(), outStream); + STRING_CODER.encode(value.getPartitionKey(), outStream); + INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream); + VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream); + INSTANT_CODER.encode(value.getReadTime(), outStream); + STRING_CODER.encode(value.getStreamName(), outStream); + STRING_CODER.encode(value.getShardId(), outStream); } @Override - public KinesisRecord decode(InputStream inStream, Context context) throws IOException { - Context nested = context.nested(); - ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested)); - String sequenceNumber = STRING_CODER.decode(inStream, nested); - String partitionKey = STRING_CODER.decode(inStream, nested); - Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested); - long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested); - Instant readTimestamp = INSTANT_CODER.decode(inStream, nested); - String streamName = STRING_CODER.decode(inStream, nested); - String shardId = STRING_CODER.decode(inStream, context); + public KinesisRecord decode(InputStream inStream) throws IOException { + ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream)); + String sequenceNumber = STRING_CODER.decode(inStream); + String partitionKey = STRING_CODER.decode(inStream); + Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream); + long subSequenceNumber = VAR_LONG_CODER.decode(inStream); + Instant readTimestamp = INSTANT_CODER.decode(inStream); + String streamName = STRING_CODER.decode(inStream); + String shardId = STRING_CODER.decode(inStream); return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey, approximateArrivalTimestamp, readTimestamp, streamName, shardId ); http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java index 5b2ec02..d4c0440 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java @@ -88,15 +88,8 @@ public class JAXBCoder<T> extends CustomCoder<T> { } @Override - public void encode(T value, OutputStream outStream) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - jaxbMarshaller.get().marshal(value, baos); - } catch (JAXBException e) { - throw new CoderException(e); - } - VarInt.encode(baos.size(), outStream); - baos.writeTo(outStream); + public void encode(T value, OutputStream outStream) throws CoderException, IOException { + encode(value, outStream, Context.NESTED); } @Override @@ -109,11 +102,23 @@ public class JAXBCoder<T> extends CustomCoder<T> { throw new CoderException(e); } } else { - encode(value, outStream); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + jaxbMarshaller.get().marshal(value, baos); + } catch (JAXBException e) { + throw new CoderException(e); + } + VarInt.encode(baos.size(), outStream); + baos.writeTo(outStream); } } @Override + public T decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public T decode(InputStream inStream, Context context) throws CoderException, IOException { try { if (!context.isWholeStream) { http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java index 2b4503a..c175e4a 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java @@ -178,20 +178,29 @@ public class JAXBCoderTest { } @Override + public void encode(TestType value, OutputStream outStream) + throws CoderException, IOException { + encode(value, outStream, Context.NESTED); + } + + @Override public void encode(TestType value, OutputStream outStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); - VarIntCoder.of().encode(3, outStream, nestedContext); - jaxbCoder.encode(value, outStream, nestedContext); + VarIntCoder.of().encode(3, outStream); + jaxbCoder.encode(value, outStream); VarLongCoder.of().encode(22L, outStream, context); } @Override + public TestType decode(InputStream inStream) throws CoderException, IOException { + return decode(inStream, Context.NESTED); + } + + @Override public TestType decode(InputStream inStream, Context context) throws CoderException, IOException { - Context nestedContext = context.nested(); - VarIntCoder.of().decode(inStream, nestedContext); - TestType result = jaxbCoder.decode(inStream, nestedContext); + VarIntCoder.of().decode(inStream); + TestType result = jaxbCoder.decode(inStream); VarLongCoder.of().decode(inStream, context); return result; }
