seojangho closed pull request #50: [NEMO-32] Support as byte[] encoding & decoding URL: https://github.com/apache/incubator-nemo/pull/50
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java new file mode 100644 index 00000000..069e2542 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/BytesDecoderFactory.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import edu.snu.nemo.common.DirectByteArrayOutputStream; + +import java.io.IOException; +import java.io.InputStream; + +/** + * A {@link DecoderFactory} which is used for an array of bytes. + */ +public final class BytesDecoderFactory implements DecoderFactory<byte[]> { + + private static final BytesDecoderFactory BYTES_DECODER_FACTORY = new BytesDecoderFactory(); + + /** + * A private constructor. + */ + private BytesDecoderFactory() { + // do nothing. + } + + /** + * Static initializer of the decoder. + */ + public static BytesDecoderFactory of() { + return BYTES_DECODER_FACTORY; + } + + @Override + public Decoder<byte[]> create(final InputStream inputStream) { + return new BytesDecoder(inputStream); + } + + /** + * BytesDecoder. + */ + private final class BytesDecoder implements Decoder<byte[]> { + + private final InputStream inputStream; + + /** + * Constructor. + * + * @param inputStream the input stream to decode. + */ + private BytesDecoder(final InputStream inputStream) { + this.inputStream = inputStream; + } + + @Override + public byte[] decode() throws IOException { + // We cannot use inputStream.available() to know the length of bytes to read. + // The available method only returns the number of bytes can be read without blocking. + final DirectByteArrayOutputStream byteOutputStream = new DirectByteArrayOutputStream(); + int b = inputStream.read(); + while (b != -1) { + byteOutputStream.write(b); + b = inputStream.read(); + } + + final int lengthToRead = byteOutputStream.getCount(); + if (lengthToRead == 0) { + throw new IOException("EoF (empty partition)!"); // TODO #?: use EOF exception instead of IOException. + } + final byte[] resultBytes = new byte[lengthToRead]; // Read the size of this byte array. + System.arraycopy(byteOutputStream.getBufDirectly(), 0, resultBytes, 0, lengthToRead); + + return resultBytes; + } + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/coder/BytesEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/BytesEncoderFactory.java new file mode 100644 index 00000000..77188429 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/coder/BytesEncoderFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.coder; + +import java.io.*; + +/** + * A {@link EncoderFactory} which is used for an array of bytes. + */ +public final class BytesEncoderFactory implements EncoderFactory<byte[]> { + + private static final BytesEncoderFactory BYTES_ENCODER_FACTORY = new BytesEncoderFactory(); + + /** + * A private constructor. + */ + private BytesEncoderFactory() { + // do nothing. + } + + /** + * Static initializer of the encoder. + */ + public static BytesEncoderFactory of() { + return BYTES_ENCODER_FACTORY; + } + + @Override + public Encoder<byte[]> create(final OutputStream outputStream) { + return new BytesEncoder(outputStream); + } + + /** + * BytesEncoder. + */ + private final class BytesEncoder implements Encoder<byte[]> { + + private final OutputStream outputStream; + + /** + * Constructor. + * + * @param outputStream the output stream to store the encoded bytes. + */ + private BytesEncoder(final OutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public void encode(final byte[] value) throws IOException { + // Write the byte[] as is. + // Because this interface use the length of byte[] element, + // the element must not have any padding bytes. + outputStream.write(value); + } + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDecoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDecoderPass.java new file mode 100644 index 00000000..fc8b7a34 --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeDecoderPass.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import edu.snu.nemo.common.coder.BytesDecoderFactory; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; + +import java.util.Collections; +import java.util.List; + +/** + * A pass to support Sailfish-like shuffle by tagging edges. + * This pass modifies the decoder property toward {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform} + * to read data as byte arrays. + */ +public final class SailfishEdgeDecoderPass extends AnnotatingPass { + /** + * Default constructor. + */ + public SailfishEdgeDecoderPass() { + super(DecoderProperty.class, Collections.singleton(DataCommunicationPatternProperty.class)); + } + + @Override + public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { + dag.getVertices().forEach(vertex -> { + final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex); + inEdges.forEach(edge -> { + if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get() + .equals(DataCommunicationPatternProperty.Value.Shuffle)) { + edge.setProperty(DecoderProperty.of(BytesDecoderFactory.of())); + } + }); + }); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeEncoderPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeEncoderPass.java new file mode 100644 index 00000000..418b9af4 --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishEdgeEncoderPass.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import edu.snu.nemo.common.coder.BytesEncoderFactory; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; + +import java.util.Collections; +import java.util.List; + +/** + * A pass to support Sailfish-like shuffle by tagging edges. + * This pass modifies the encoder property toward {@link edu.snu.nemo.common.ir.vertex.transform.RelayTransform} + * to write data as byte arrays. + */ +public final class SailfishEdgeEncoderPass extends AnnotatingPass { + /** + * Default constructor. + */ + public SailfishEdgeEncoderPass() { + super(EncoderProperty.class, Collections.singleton(DataCommunicationPatternProperty.class)); + } + + @Override + public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { + dag.getVertices().forEach(vertex -> { + final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex); + inEdges.forEach(edge -> { + if (edge.getPropertyValue(DataCommunicationPatternProperty.class).get() + .equals(DataCommunicationPatternProperty.Value.Shuffle)) { + dag.getOutgoingEdgesOf(edge.getDst()) + .forEach(edgeFromRelay -> { + edgeFromRelay.setProperty(EncoderProperty.of(BytesEncoderFactory.of())); + }); + } + }); + }); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java index aa77ca1d..d5c50e4b 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java @@ -15,9 +15,7 @@ */ package edu.snu.nemo.compiler.optimizer.pass.compiletime.composite; -import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.SailfishEdgeDataFlowModelPass; -import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.SailfishEdgeDataStorePass; -import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.SailfishEdgeUsedDataHandlingPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.*; import edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.SailfishRelayReshapingPass; import java.util.Arrays; @@ -34,6 +32,8 @@ public SailfishPass() { new SailfishRelayReshapingPass(), new SailfishEdgeDataFlowModelPass(), new SailfishEdgeDataStorePass(), + new SailfishEdgeDecoderPass(), + new SailfishEdgeEncoderPass(), new SailfishEdgeUsedDataHandlingPass() )); } diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java index cd456fef..e779f581 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java @@ -16,12 +16,11 @@ package edu.snu.nemo.tests.compiler.optimizer.pass.compiletime.composite; import edu.snu.nemo.client.JobLauncher; +import edu.snu.nemo.common.coder.BytesDecoderFactory; +import edu.snu.nemo.common.coder.BytesEncoderFactory; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty; -import edu.snu.nemo.common.ir.edge.executionproperty.UsedDataHandlingProperty; +import edu.snu.nemo.common.ir.edge.executionproperty.*; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.SailfishPass; import edu.snu.nemo.tests.compiler.CompilerTestUtil; @@ -54,7 +53,7 @@ public void testSailfish() { if (processedDAG.getIncomingEdgesOf(irVertex).stream().anyMatch(irEdge -> DataCommunicationPatternProperty.Value.Shuffle .equals(irEdge.getPropertyValue(DataCommunicationPatternProperty.class).get()))) { - // Merger vertex + // Relay vertex processedDAG.getIncomingEdgesOf(irVertex).forEach(edgeToMerger -> { if (DataCommunicationPatternProperty.Value.Shuffle .equals(edgeToMerger.getPropertyValue(DataCommunicationPatternProperty.class).get())) { @@ -64,6 +63,8 @@ public void testSailfish() { edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get()); assertEquals(DataStoreProperty.Value.SerializedMemoryStore, edgeToMerger.getPropertyValue(DataStoreProperty.class).get()); + assertEquals(BytesDecoderFactory.of(), + edgeToMerger.getPropertyValue(DecoderProperty.class).get()); } else { assertEquals(DataFlowModelProperty.Value.Pull, edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get()); @@ -76,6 +77,8 @@ public void testSailfish() { edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get()); assertEquals(DataStoreProperty.Value.LocalFileStore, edgeFromMerger.getPropertyValue(DataStoreProperty.class).get()); + assertEquals(BytesEncoderFactory.of(), + edgeFromMerger.getPropertyValue(EncoderProperty.class).get()); }); } else { // Non merger vertex. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
