fapifta commented on a change in pull request #2507: URL: https://github.com/apache/ozone/pull/2507#discussion_r690601965
########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java ########## @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + +/** + * Interface used to create BlockInputStreams. + */ +public interface BlockInputStreamFactory { Review comment: As we discussed on other channels, we may want to avoid confusion coming from the name "factory" as this is pretty much just an interface to define a method that creates (supplies) the stream, this way providing the possibility to inject mock instances from tests. We might want to use a Supplier<BlockInputStream> in the constructor where currently the factory is provided, and then from tests we can provide a TestBlockInputStream, or a mocked InputStream, while in the production code we can just provide a new BlockInputstream from a lambda expression. ########## File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java ########## @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.rpc.read; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; +import org.apache.hadoop.ozone.client.io.ECBlockInputStream; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.security.token.Token; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * Tests for ECBlockInputStream. + */ +public class TestECBlockInputStream { + + private static final int ONEMB = 1024 * 1024; + + @Test + // TODO - this test will need changed when we can do recovery reads. + public void testSufficientLocations() { + ECReplicationConfig repConfig = new ECReplicationConfig(3, 2); + + // EC-3-2, 5MB block, so all 3 data locations are needed + OmKeyLocationInfo keyInfo = createKeyInfo(repConfig, 5, 5 * ONEMB); + ECBlockInputStream ecb = new ECBlockInputStream(repConfig, ONEMB, + keyInfo, true, new TestBlockInputStreamFactory()); + Assert.assertTrue(ecb.hasSufficientLocations()); + ecb.close(); + + // EC-3-2, very large block, so all 3 data locations are needed + keyInfo = createKeyInfo(repConfig, 5, 5000 * ONEMB); + ecb = new ECBlockInputStream(repConfig, ONEMB, + keyInfo, true, new TestBlockInputStreamFactory()); + Assert.assertTrue(ecb.hasSufficientLocations()); + ecb.close(); + + Map<DatanodeDetails, Integer> dnMap = new HashMap<>(); + + // EC-3-2, 1 byte short of 1MB with 1 location + dnMap.clear(); Review comment: Nit: I don't think we need to clear the map we just created, or am I missing something? ########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java ########## @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + +/** + * Interface used to create BlockInputStreams. + */ +public interface BlockInputStreamFactory { Review comment: Sorry, my bad, I had some assumptions and overlooked a few things here... @sodonnel after you asked offline for some extra explanation what I am thinking about here, I went and tried out, and realized that even if it is possible to create a functional interface, and use a lambda, you are right, it does not really express anything better... ########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java ########## @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; + +import java.util.function.Function; + +/** + * Concrete implementation of a BlockInputStreamFactory to create + * BlockInputSteams in a real cluster. + */ +public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory { + + private final XceiverClientFactory xceiverClientFactory; + private final Function<BlockID, Pipeline> refreshFunction; + + public BlockInputStreamFactoryImpl(XceiverClientFactory xceiverFactory, + Function<BlockID, Pipeline> refreshFunction) { + this.xceiverClientFactory = xceiverFactory; + this.refreshFunction = refreshFunction; + } + + @Override + public BlockInputStream create(BlockID blockId, long blockLen, + Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token, + boolean verifyChecksum) { + return new BlockInputStream(blockId, blockLen, pipeline, token, Review comment: I agree that this is more or less a strategy instead of a factory, in the sense of how it is used, on the other hand it is a strategy, given to the ECBlockInputstream, which provides regular BlockInputStream instances. In this sense it is creating objects. I checked into if it can be implemented as a functional interface, and passed in as a lambda, but Stephen is right, that would not make the code any more simpler or readable. If we agree (and I am) that factory would be an overload of the factory design concept, with different meaning here, then a good alternative might be BlockInputStreamProvider, or BlockInputStreamCreator as the name, but the code itself is fine. (Though, if we go with provider, I would change the method name to provide instead of create.) One more thing that I would question here, is the separation of the refreshFunction and the xceiverClientFactory into an internal state inside this type. I am unsure, but most likely ECBlockInputStream will be used from KeyInputStream#addStream(), and in that case, we can pass on both the client factory and the refresh function to the create/provide method there, and nulls in the current tests. ########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java ########## @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.scm.storage.ByteArrayReader; +import org.apache.hadoop.hdds.scm.storage.ByteBufferReader; +import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Class to read data from an EC Block Group. + */ +public class ECBlockInputStream extends InputStream Review comment: Just a note: if we go with ECBlockGroupInputStream, we also should rename ECBlockOutputStream, and keep the naming symmetric. I can live with both approach, if we are consistent with in/output stream names, but would go with the current one if it is on me ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
