pudidic commented on code in PR #3444: URL: https://github.com/apache/hive/pull/3444#discussion_r943047337
########## ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/ExportService.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import com.cronutils.utils.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hive.conf.HiveConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/* + * A service to dump/export table and partition export jobs in parallel + */ +public class ExportService { + /* + * Executor service to execute runnable export jobs. + */ + private ExecutorService execService; + private static final Logger LOG = LoggerFactory.getLogger(ExportService.class); + private ExportService() { + } + + /* + * A singleton instance of ExportService. + */ + private static final ExportService INSTANCE = new ExportService(); + + /* Review Comment: Please read Javadoc document https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html for its styling. /** is preferred when you write Javadoc comments. ########## ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java: ########## @@ -186,6 +189,10 @@ public String getName() { @Override public int execute() { + // Get ExportService and configure it with given parallelism specified by + // REPL_TABLE_DUMP_PARALLELISM + exportService = ExportService.getInstance(); Review Comment: How about creating the ExportService instance in HiveServer2 class? Please refer HiveServer2#maybeStartCompactorThreads for configure and start threads for. ########## ql/src/test/org/apache/hadoop/hive/ql/parse/repl/dump/TestExportService.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({LoggerFactory.class, ExportService.class}) + +public class TestExportService { + + protected static final Logger LOG = LoggerFactory.getLogger(TestExportService.class); + + @Mock + HiveConf conf; + + private final int nThreads = 10; + + private static int taskNumber = 0; + + private final int totalTask = 10; + + private Semaphore sem; + + public ExportJob runParallelTask() { + return new ExportJob() { + @Override + public void run() { + Assert.assertTrue(sem.tryAcquire()); + ++taskNumber; + LOG.debug("Current task number is: {} and thread is: {} ", taskNumber, Thread.currentThread().getName()); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + } + + @After + public void finalize() + { + for (int i = 0; i < totalTask; i++) { + sem.release(); + } + } + + @Test + public void testExportServiceWithParallelism() throws Exception { + when(conf.getIntVar(HiveConf.ConfVars.REPL_TABLE_DUMP_PARALLELISM)).thenReturn(nThreads); + taskNumber = 0; + sem = new Semaphore(totalTask); + ExportService exportService = ExportService.getInstance(); + exportService.configure(conf, true); + for (int i = 0; i < totalTask; i++) { + exportService.submit(runParallelTask()); + } + final long actualNumTasksExecuted = exportService.getTotalTaskEverExecuted(); + Assert.assertEquals(totalTask, actualNumTasksExecuted); + exportService.shutdown(); + exportService.await(Long.MAX_VALUE, TimeUnit.SECONDS); Review Comment: Please consider a test failure case with Long.MAX_VALUE waiting time. Can few minutes or shorter catch them? ########## ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java: ########## @@ -75,64 +77,62 @@ class PartitionExport { this.callersSession = SessionState.get(); } - List<DataCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask, + public void exportPartitions(final ReplicationSpec forReplicationSpec, boolean isExportTask, + FileList fileList, boolean dataCopyAtLoad) throws RuntimeException { Review Comment: Please introduce an additional prefix and keep the same base name. Both of backing and wrapping methods share a same logical behavior, but in a different thread model. For example, parallelXxx(...) and xxx(...) can help readers to what they represent. Additionally, it's better to have a Javadoc document to explicitly say that parallelXxx is a parallelized version of xxx. You can use @ see annotation for its reference. exportPartitions and write may lead readers to think that they do different things, and exportPartitions doesn't give a good clue about thread models. A same advice applies for exportTable method, too. ########## ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java: ########## @@ -75,64 +77,62 @@ class PartitionExport { this.callersSession = SessionState.get(); } - List<DataCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask, + public void exportPartitions(final ReplicationSpec forReplicationSpec, boolean isExportTask, + FileList fileList, boolean dataCopyAtLoad) throws RuntimeException { + exportService.submit(new ExportJob() { + @Override + public void run() { + try { + write(forReplicationSpec, isExportTask, fileList, dataCopyAtLoad); + } catch(InterruptedException e) { + throw new RuntimeException(e.getCause().getMessage(), e); + } catch (HiveException e) { + throw new RuntimeException(e.getCause().getMessage(), e); + } + } + } + ); + } + private List<DataCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws InterruptedException, HiveException { - List<Future<?>> futures = new LinkedList<>(); List<DataCopyPath> managedTableCopyPaths = new LinkedList<>(); + List<Future<?>> futures = new LinkedList<>(); ExecutorService producer = Executors.newFixedThreadPool(1, - new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); + new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); futures.add(producer.submit(() -> { SessionState.setCurrentSessionState(callersSession); for (Partition partition : partitionIterable) { try { queue.put(partition); } catch (InterruptedException e) { throw new RuntimeException( - "Error while queuing up the partitions for export of data files", e); + "Error while queuing up the partitions for export of data files", e); } } })); producer.shutdown(); ThreadFactory namingThreadFactory = - new ThreadFactoryBuilder().setNameFormat("partition-dump-thread-%d").build(); + new ThreadFactoryBuilder().setNameFormat("partition-dump-thread-%d").build(); ExecutorService consumer = Executors.newFixedThreadPool(nThreads, namingThreadFactory); while (!producer.isTerminated() || !queue.isEmpty()) { - /* - This is removed using a poll because there can be a case where there partitions iterator is empty - but because both the producer and consumer are started simultaneously the while loop will execute - because producer is not terminated but it wont produce anything so queue will be empty and then we - should only wait for a specific time before continuing, as the next loop cycle will fail. - */ + /* + This is removed using a poll because there can be a case where there partitions iterator is empty + but because both the producer and consumer are started simultaneously the while loop will execute + because producer is not terminated but it wont produce anything so queue will be empty and then we + should only wait for a specific time before continuing, as the next loop cycle will fail. + */ Partition partition = queue.poll(1, TimeUnit.SECONDS); Review Comment: Please respect the original indentation. Sometimes your indentation is better. But other committers may have a different opinion. We don't want to make repetitive changes to grow. Also it's much cleaner on patch files, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
