Baunsgaard commented on code in PR #1867: URL: https://github.com/apache/systemds/pull/1867#discussion_r1268626678
########## src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java: ########## @@ -22,14 +22,14 @@ import java.io.Serializable; import java.net.ConnectException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; Review Comment: No wild card imports, you can configure your IDE to not do this ########## src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java: ########## @@ -21,18 +21,24 @@ import java.io.Serializable; import java.security.cert.CertificateException; +import java.util.Optional; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; +import io.netty.channel.*; Review Comment: no wildcard imports ########## src/test/scripts/functions/federated/io/FederatedCompressionTest1Reference.dml: ########## @@ -0,0 +1,24 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +x = read($fedmatrix) +x = x+1 +write(x, $out) Review Comment: newline in end of file ########## src/main/java/org/apache/sysds/runtime/controlprogram/federated/compression/CompressionEncoder.java: ########## @@ -0,0 +1,23 @@ +package org.apache.sysds.runtime.controlprogram.federated.compression; Review Comment: License ########## src/test/scripts/functions/federated/io/FederatedCompressionTest.dml: ########## @@ -0,0 +1,24 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +x = read($fedmatrix) +x = x+1 +write(x, $out) Review Comment: new line in end of file ########## src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java: ########## @@ -238,12 +233,18 @@ private static ChannelInitializer<SocketChannel> createChannel(InetSocketAddress @Override protected void initChannel(SocketChannel ch) throws Exception { final ChannelPipeline cp = ch.pipeline(); + final Optional<ImmutablePair<ChannelInboundHandlerAdapter, ChannelOutboundHandlerAdapter>> compressionStrategy = FederationUtils.compressionStrategy(); cp.addLast("NetworkTrafficCounter", new NetworkTrafficCounter(FederatedStatistics::logServerTraffic)); if(ssl) cp.addLast(createSSLHandler(ch, address)); if(timeout > -1) cp.addLast(new ReadTimeoutHandler(timeout)); - cp.addLast(FederationUtils.decoder(), new FederatedRequestEncoder(), handler); + // cp.addLast(FederationUtils.decoder(), new FederatedRequestEncoder(), handler); + compressionStrategy.ifPresent(strategy -> cp.addLast(strategy.left)); + cp.addLast(FederationUtils.decoder()); + compressionStrategy.ifPresent(strategy -> cp.addLast(strategy.right)); + cp.addLast(new FederatedRequestEncoder()); Review Comment: Is it correct when it always adds the decoder even if no compression is specified? Should it not only conditionally ad the decoder? Or is this what the if Present does? ########## src/test/scripts/functions/federated/io/config/NoneCompressionConfig.xml: ########## @@ -0,0 +1,29 @@ +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> + +<root> + <!-- The number of theads for the spark instance artificially selected--> + <sysds.local.spark.number.threads>2</sysds.local.spark.number.threads> + <!-- The timeout of the federated tests to initialize the federated matrixes --> + <sysds.federated.initialization.timeout>2</sysds.federated.initialization.timeout> + <!-- The timeout of each instruction sent to federated workers --> + <sysds.federated.timeout>128</sysds.federated.timeout> + <!-- sets the federated compression strategy (none, zlib) --> + <sysds.federated.compression>none</sysds.federated.compression> +</root> Review Comment: newline ########## src/test/java/org/apache/sysds/test/functions/federated/io/FederatedCompressionTest.java: ########## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.federated.io; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.common.Types; +import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysds.runtime.matrix.data.MatrixValue; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.test.functions.federated.FederatedTestObjectConstructor; +import org.apache.sysds.utils.stats.FederatedCompressionStatistics; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; + +@RunWith(value = Parameterized.class) +@net.jcip.annotations.NotThreadSafe +public class FederatedCompressionTest extends AutomatedTestBase { + + private static final Log LOG = LogFactory.getLog(FederatedCompressionTest.class.getName()); + private final static String TEST_DIR = "functions/federated/io/"; + private final static String TEST_NAME = "FederatedCompressionTest"; + private final static String TEST_CLASS_DIR = TEST_DIR + FederatedCompressionTest.class.getSimpleName() + "/"; + private final static int blocksize = 1024; + private final static String OUTPUT_NAME = "Z"; + private final static String TEST_CONF_FOLDER = SCRIPT_DIR + TEST_DIR + "config/"; + + @Parameterized.Parameter() + public String compressionStrategy; + @Parameterized.Parameter(1) + public int dim; + @Parameterized.Parameter(2) + public long[][] begins; + @Parameterized.Parameter(3) + public long[][] ends; + + protected String getTestDir() { + return "functions/federated/io/"; + } + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {OUTPUT_NAME})); + } + + @Parameterized.Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + // {compressionStrategy, dim, begins, ends} + {"None", 3, new long[][] {new long[] {0, 0}}, new long[][] {new long[] {3, 3}}}, + {"Zlib", 3, new long[][] {new long[] {0, 0}}, new long[][] {new long[] {3, 3}}}, + {"None", 1000, new long[][] {new long[] {0, 0}}, new long[][] {new long[] {1000, 1000}}}, + {"Zlib", 1000, new long[][] {new long[] {0, 0}}, new long[][] {new long[] {1000, 1000}}}, + }); + } + + @Test + public void testFederatedReadWriteCompressionStrategies() { + federatedReadWriteCompression(); + } + + public void federatedReadWriteCompression() { + System.out.println("CompressionStrategy: " + compressionStrategy); + System.out.println("Dim: " + dim); Review Comment: try to avoid printing in the tests, to reduce the clutter when the test is run in github. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@systemds.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org