This is an automated email from the ASF dual-hosted git repository.
ncole pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1318fdb [AMBARI-22936] Remove Code for
ExperiementalFeature.PARALLEL_PROCESSING (#297)
1318fdb is described below
commit 1318fdb8d14cff96d2ef923b5ed38104ee91ff60
Author: ncole <[email protected]>
AuthorDate: Thu Feb 8 11:26:14 2018 -0500
[AMBARI-22936] Remove Code for ExperiementalFeature.PARALLEL_PROCESSING
(#297)
* [AMBARI-22936] Remove Code for ExperiementalFeature.PARALLEL_PROCESSING
---
ambari-server/docs/configuration/index.md | 4 +-
.../ambari/annotations/ExperimentalFeature.java | 8 -
.../server/actionmanager/ActionDBAccessorImpl.java | 41 +---
.../ambari/server/configuration/Configuration.java | 28 +--
.../org/apache/ambari/server/utils/Parallel.java | 266 ---------------------
.../ambari/server/utils/ParallelLoopResult.java | 63 -----
.../server/configuration/ConfigurationTest.java | 13 -
.../apache/ambari/server/utils/TestParallel.java | 223 -----------------
8 files changed, 8 insertions(+), 638 deletions(-)
diff --git a/ambari-server/docs/configuration/index.md
b/ambari-server/docs/configuration/index.md
index a0461e8..2aa6475 100644
--- a/ambari-server/docs/configuration/index.md
+++ b/ambari-server/docs/configuration/index.md
@@ -88,9 +88,9 @@ The following are the properties which can be used to
configure Ambari.
| auto.group.creation | The auto group creation by Ambari |`false` |
| bootstrap.dir | The directory on the Ambari Server file system used for
storing Ambari Agent bootstrap information such as request responses.
|`/var/run/ambari-server/bootstrap` |
| bootstrap.master_host_name | The host name of the Ambari Server which will
be used by the Ambari Agents for communication. | |
-| bootstrap.script | The location and name of the Python script used to
bootstrap new Ambari Agent hosts.
|`/usr/lib/python2.6/site-packages/ambari_server/bootstrap.py` |
+| bootstrap.script | The location and name of the Python script used to
bootstrap new Ambari Agent hosts.
|`/usr/lib/ambari-server/lib/ambari_server/bootstrap.py` |
| bootstrap.setup_agent.password | The password to set on the
`AMBARI_PASSPHRASE` environment variable before invoking the bootstrap script.
|`password` |
-| bootstrap.setup_agent.script | The location and name of the Python script
executed on the Ambari Agent host during the bootstrap process.
|`/usr/lib/python2.6/site-packages/ambari_server/setupAgent.py` |
+| bootstrap.setup_agent.script | The location and name of the Python script
executed on the Ambari Agent host during the bootstrap process.
|`/usr/lib/ambari-server/lib/ambari_server/setupAgent.py` |
| client.api.acceptor.count | Count of acceptors to configure for the jetty
connector used for Ambari API. | |
| client.api.port | The port that client connections will use with the REST
API. The Ambari Web client runs on this port. |`8080` |
| client.api.ssl.cert_pass_file | The filename which contains the password for
the keystores, truststores, and certificates for the REST API when it's
protected by SSL. |`https.pass.txt` |
diff --git
a/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java
b/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java
index 35c3c2f..d58e94e 100644
---
a/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java
+++
b/ambari-server/src/main/java/org/apache/ambari/annotations/ExperimentalFeature.java
@@ -17,8 +17,6 @@
*/
package org.apache.ambari.annotations;
-import java.util.concurrent.Executor;
-
/**
* The {@link ExperimentalFeature} enumeration is meant to be used with the
* {@link Experimental} annotation to indicate which feature set experimental
@@ -26,12 +24,6 @@ import java.util.concurrent.Executor;
*/
public enum ExperimentalFeature {
/**
- * The processing of arbitrary, atomic list elements by an {@link Executor}
in
- * order to arrive at a full processed list faster.
- */
- PARALLEL_PROCESSING,
-
- /**
* The caching of current alert information in order to reduce overall load
on
* the database by preventing frequent updates and JPA entity invalidation.
*/
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 063ea1c..aae3441 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -30,8 +30,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.ambari.annotations.Experimental;
-import org.apache.ambari.annotations.ExperimentalFeature;
import org.apache.ambari.annotations.TransactionalLock;
import org.apache.ambari.annotations.TransactionalLock.LockArea;
import org.apache.ambari.annotations.TransactionalLock.LockType;
@@ -69,9 +67,6 @@ import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
-import org.apache.ambari.server.utils.LoopBody;
-import org.apache.ambari.server.utils.Parallel;
-import org.apache.ambari.server.utils.ParallelLoopResult;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -277,7 +272,6 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
* {@inheritDoc}
*/
@Override
- @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
public List<Stage> getStagesInProgressForRequest(Long requestId) {
List<StageEntity> stageEntities =
stageDAO.findByRequestIdAndCommandStatuses(requestId,
HostRoleStatus.IN_PROGRESS_STATUSES);
return getStagesForEntities(stageEntities);
@@ -299,38 +293,13 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
return stages;
}
- @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
private List<Stage> getStagesForEntities(List<StageEntity> stageEntities) {
- // experimentally enable parallel stage processing
- @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
- boolean useConcurrentStageProcessing =
configuration.isExperimentalConcurrentStageProcessingEnabled();
- if (useConcurrentStageProcessing) {
- ParallelLoopResult<Stage> loopResult = Parallel.forLoop(stageEntities,
- new LoopBody<StageEntity, Stage>() {
- @Override
- public Stage run(StageEntity stageEntity) {
- return stageFactory.createExisting(stageEntity);
- }
- });
- if (loopResult.getIsCompleted()) {
- return loopResult.getResult();
- } else {
- // Fetch any missing results sequentially
- List<Stage> stages = loopResult.getResult();
- for (int i = 0; i < stages.size(); i++) {
- if (stages.get(i) == null) {
- stages.set(i, stageFactory.createExisting(stageEntities.get(i)));
- }
- }
- return stages;
- }
- } else {
- List<Stage> stages = new ArrayList<>(stageEntities.size());
- for (StageEntity stageEntity : stageEntities) {
- stages.add(stageFactory.createExisting(stageEntity));
- }
- return stages;
+ List<Stage> stages = new ArrayList<>(stageEntities.size());
+ for (StageEntity stageEntity : stageEntities) {
+ stages.add(stageFactory.createExisting(stageEntity));
}
+
+ return stages;
}
/**
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index fa046ec..3a51021 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -58,14 +58,12 @@ import org.apache.ambari.annotations.Markdown;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.CommandExecutionType;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
-import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.controller.spi.PropertyProvider;
import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
import org.apache.ambari.server.orm.JPATableGenerationStrategy;
import org.apache.ambari.server.orm.PersistenceType;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
-import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.security.ClientSecurityType;
import
org.apache.ambari.server.security.authentication.jwt.JwtAuthenticationProperties;
import
org.apache.ambari.server.security.authentication.kerberos.AmbariKerberosAuthenticationProperties;
@@ -78,7 +76,6 @@ import
org.apache.ambari.server.upgrade.AbstractUpgradeCatalog;
import org.apache.ambari.server.utils.AmbariPath;
import org.apache.ambari.server.utils.DateUtils;
import org.apache.ambari.server.utils.HostUtils;
-import org.apache.ambari.server.utils.Parallel;
import org.apache.ambari.server.utils.PasswordUtils;
import org.apache.ambari.server.utils.ShellCommandUtil;
import org.apache.ambari.server.utils.StageUtils;
@@ -2034,16 +2031,6 @@ public class Configuration {
"server.timeline.metrics.https.enabled", Boolean.FALSE);
/**
- * Governs the use of {@link Parallel} to process {@link StageEntity}
- * instances into {@link Stage}.
- */
- @Markdown(
- internal = true,
- description = "Determines whether to allow stages retrieved from the
database to be processed by multiple threads.")
- public static final ConfigurationProperty<Boolean>
EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED = new ConfigurationProperty<>(
- "experimental.concurrency.stage_processing.enabled", Boolean.FALSE);
-
- /**
* The full path to the XML file that describes the different alert
templates.
*/
@Markdown(description="The full path to the XML file that describes the
different alert templates.")
@@ -2521,7 +2508,7 @@ public class Configuration {
"notification.dispatch.alert.script.directory",AmbariPath.getPath("/var/lib/ambari-server/resources/scripts"));
@Markdown(description = "Whether security password encryption is enabled or
not. In case it is we store passwords in their own file(s); otherwise we store
passwords in the Ambari credential store.")
- public static final ConfigurationProperty<Boolean>
SECURITY_PASSWORD_ENCRYPTON_ENABLED = new
ConfigurationProperty<Boolean>("security.passwords.encryption.enabled", false);
+ public static final ConfigurationProperty<Boolean>
SECURITY_PASSWORD_ENCRYPTON_ENABLED = new
ConfigurationProperty<>("security.passwords.encryption.enabled", false);
/**
* The maximum number of authentication attempts permitted to a local user.
Once the number of failures reaches this limit the user will be locked out. 0
indicates unlimited failures
@@ -5059,19 +5046,6 @@ public class Configuration {
}
/**
- * Gets whether to use experiemental concurrent processing to convert
- * {@link StageEntity} instances into {@link Stage} instances. The default is
- * {@code false}.
- *
- * @return {code true} if the experimental feature is enabled, {@code false}
- * otherwise.
- */
- @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING)
- public boolean isExperimentalConcurrentStageProcessingEnabled() {
- return
Boolean.parseBoolean(getProperty(EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED));
- }
-
- /**
* If {@code true}, then alerts processed by the {@link
AlertReceivedListener}
* will not write alert data to the database on every event. Instead, data
* like timestamps and text will be kept in a cache and flushed out
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
b/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
deleted file mode 100644
index 81ce9d2..0000000
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/Parallel.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.utils;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.eclipse.persistence.internal.helper.ConcurrencyManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <b>TEMPORARILY DO NOT USE WITH JPA ENTITIES</b>
- * <p/>
- * Deprecated since the use of this class to access JPA from multiple Ambari
- * threads seems to cause thread liveliness problems in
- * {@link ConcurrencyManager}.
- * <p/>
- * This class provides support for parallel loops. Iterations in the loop run
in
- * parallel in parallel loops.
- */
-@Deprecated
-public class Parallel {
-
- /**
- * Max pool size
- */
- private static final int MAX_POOL_SIZE = Math.max(8,
Runtime.getRuntime().availableProcessors());
-
- /**
- * Keep alive time (15 min)
- */
- // !!! changed from 1 second because EclipseLink was making threads idle and
- // they kept timing out
- private static final int KEEP_ALIVE_TIME_MINUTES = 15;
-
- /**
- * Poll duration (10 secs)
- */
- private static final int POLL_DURATION_MILLISECONDS = 10000;
-
- /**
- * Core pool size
- */
- private static final int CORE_POOL_SIZE = 2;
-
- /**
- * Logger
- */
- private static final Logger LOG = LoggerFactory.getLogger(Parallel.class);
-
- /**
- * Thread pool executor
- */
- private static ExecutorService executor = initExecutor();
-
- /**
- * Initialize executor
- *
- * @return
- */
- private static ExecutorService initExecutor() {
-
- BlockingQueue<Runnable> blockingQueue = new SynchronousQueue<>(); // Using
synchronous queue
-
- // Create thread pool
- ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
- CORE_POOL_SIZE, // Core pool size
- MAX_POOL_SIZE, // Max pool size
- KEEP_ALIVE_TIME_MINUTES, // Keep alive time for idle
threads
- TimeUnit.MINUTES,
- blockingQueue, // Using synchronous queue
- new ParallelLoopsThreadFactory(), // Thread pool factory to use
- new ThreadPoolExecutor.CallerRunsPolicy() // Rejected tasks will run
on calling thread.
- );
- threadPool.allowCoreThreadTimeOut(true);
- LOG.debug(
- "Parallel library initialized: ThreadCount = {}, CurrentPoolSize = {},
CorePoolSize = {}, MaxPoolSize = {}",
- Thread.activeCount(), threadPool.getPoolSize(),
threadPool.getCorePoolSize(), threadPool.getMaximumPoolSize());
- return threadPool;
- }
-
- /**
- * Executes a "for" parallel loop operation over all items in the data
source in which iterations run in parallel.
- *
- * @param source Data source to iterate over
- * @param loopBody The loop body that is invoked once per iteration
- * @param <T> The type of data in the source
- * @param <R> The type of data to be returned
- * @return {@link ParallelLoopResult} Parallel loop result
- */
- public static <T, R> ParallelLoopResult<R> forLoop(
- List<T> source,
- final LoopBody<T, R> loopBody) {
-
- if(source == null || source.isEmpty()) {
- return new ParallelLoopResult<>(true, Collections.<R>emptyList());
- }
- return forLoop(source, 0, source.size(), loopBody);
- }
-
- /**
- * Executes a "for" parallel loop operation in which iterations run in
parallel.
- *
- * @param source Data source to iterate over
- * @param startIndex The loop start index, inclusive
- * @param endIndex The loop end index, exclusive
- * @param loopBody The loop body that is invoked once per iteration
- * @param <T> The type of data in the source
- * @param <R> The type of data to be returned
- * @return {@link ParallelLoopResult} Parallel loop result
- *
- */
- public static <T, R> ParallelLoopResult<R> forLoop(
- final List<T> source,
- int startIndex,
- int endIndex,
- final LoopBody<T, R> loopBody) {
-
- if(source == null || source.isEmpty() || startIndex == endIndex) {
- return new ParallelLoopResult<>(true, Collections.<R>emptyList());
- }
- if(startIndex < 0 || startIndex >= source.size()) {
- throw new IndexOutOfBoundsException("startIndex is out of bounds");
- }
- if(endIndex < 0 || endIndex > source.size()) {
- throw new IndexOutOfBoundsException("endIndex is out of bounds");
- }
- if(startIndex > endIndex) {
- throw new IndexOutOfBoundsException("startIndex > endIndex");
- }
- if(source.size() == 1 || (endIndex - startIndex) == 1) {
- // Don't spawn a new thread for a single element list
- List<R> result =
Collections.singletonList(loopBody.run(source.get(startIndex)));
- return new ParallelLoopResult<>(true, result);
- }
-
- // Create a completion service for each call
- CompletionService<ResultWrapper<R>> completionService = new
ExecutorCompletionService<>(executor);
-
- List<Future<ResultWrapper<R>>> futures = new LinkedList<>();
- for (int i = startIndex; i < endIndex; i++) {
- final Integer k = i;
- Future<ResultWrapper<R>> future = completionService.submit(new
Callable<ResultWrapper<R>>() {
- @Override
- public ResultWrapper<R> call() throws Exception {
- ResultWrapper<R> res = new ResultWrapper<>();
- res.index = k;
- res.result = loopBody.run(source.get(k));
- return res;
- }
- });
- futures.add(future);
- }
-
- boolean completed = true;
- R[] result = (R[]) new Object[futures.size()];
- for (int i = 0; i < futures.size(); i++) {
- try {
- Future<ResultWrapper<R>> futureResult = null;
- try {
- futureResult = completionService.poll(POLL_DURATION_MILLISECONDS,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.error("Caught InterruptedException in Parallel.forLoop", e);
- }
- if (futureResult == null) {
- // Timed out! no progress was made during the last poll duration.
Abort the threads and cancel the threads.
- LOG.error("Completion service in Parallel.forLoop timed out!");
- completed = false;
- for(int fIndex = 0; fIndex < futures.size(); fIndex++) {
- Future<ResultWrapper<R>> future = futures.get(fIndex);
- if(future.isDone()) {
- LOG.debug(" Task - {} has already completed", fIndex);
- } else if(future.isCancelled()) {
- LOG.debug(" Task - {} has already been cancelled", fIndex);
- } else if(!future.cancel(true)) {
- LOG.debug(" Task - {} could not be cancelled", fIndex);
- } else {
- LOG.debug(" Task - {} successfully cancelled", fIndex);
- }
- }
- // Finished processing all futures
- break;
- } else {
- ResultWrapper<R> res = futureResult.get();
- if(res.result != null) {
- result[res.index] = res.result;
- } else {
- LOG.error("Result is null for {}", res.index);
- completed = false;
- }
- }
- } catch (InterruptedException e) {
- LOG.error("Caught InterruptedException in Parallel.forLoop", e);
- completed = false;
- } catch (ExecutionException e) {
- LOG.error("Caught ExecutionException in Parallel.forLoop", e);
- completed = false;
- } catch (CancellationException e) {
- LOG.error("Caught CancellationException in Parallel.forLoop", e);
- completed = false;
- }
- }
- // Return parallel loop result
- return new ParallelLoopResult<>(completed, Arrays.asList(result));
- }
-
- /**
- * A custom {@link ThreadFactory} for the threads that will handle
- * {@link org.apache.ambari.server.utils.Parallel} loop iterations.
- */
- private static final class ParallelLoopsThreadFactory implements
ThreadFactory {
-
- private static final AtomicInteger threadId = new AtomicInteger(1);
-
- /**
- * {@inheritDoc}
- */
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = Executors.defaultThreadFactory().newThread(r);
- thread.setName("parallel-loop-" + threadId.getAndIncrement());
- return thread;
- }
- }
-
- /**
- * Result wrapper for Parallel.forLoop used internally
- * @param <R> Type of result to wrap
- */
- private static final class ResultWrapper<R> {
- int index;
- R result;
- }
-}
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
b/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
deleted file mode 100644
index 0602925..0000000
---
a/ambari-server/src/main/java/org/apache/ambari/server/utils/ParallelLoopResult.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.utils;
-
-import java.util.List;
-
-/**
- * Provides completion status and results of a {@link Parallel} loop
- * @param <R> Result type
- */
-public class ParallelLoopResult<R> {
- private boolean isCompleted;
- private List<R> result;
-
- /**
- * Flag to indicate if the parallel loop completed all iterations
- * @return
- */
- public boolean getIsCompleted() {
- return isCompleted;
- }
-
- /**
- * Flag to indicate if the parallel loop completed all iterations
- * @return
- */
- public void setIsCompleted(boolean completed) {
- isCompleted = completed;
- }
-
- public List<R> getResult() {
- return result;
- }
-
- public void setResult(List<R> result) {
- this.result = result;
- }
-
- /**
- * Constructor
- * @param completed Indicates if the parallel loop completed all iterations
- * @param result Results of parallel loop. Results could be partially
completed.
- */
- public ParallelLoopResult(boolean completed, List<R> result) {
- isCompleted = completed;
- this.result = result;
- }
-}
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index cef8903..6b79e75 100644
---
a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++
b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -515,19 +515,6 @@ public class ConfigurationTest {
}
@Test
- public void testExperimentalConcurrentStageProcessing() throws Exception {
- final Properties ambariProperties = new Properties();
- final Configuration configuration = new Configuration(ambariProperties);
-
-
Assert.assertFalse(configuration.isExperimentalConcurrentStageProcessingEnabled());
-
-
ambariProperties.setProperty(Configuration.EXPERIMENTAL_CONCURRENCY_STAGE_PROCESSING_ENABLED.getKey(),
- Boolean.TRUE.toString());
-
-
Assert.assertTrue(configuration.isExperimentalConcurrentStageProcessingEnabled());
- }
-
- @Test
public void testServerLocksProfilingEnabled() throws Exception {
final Properties ambariProperties = new Properties();
final Configuration configuration = new Configuration(ambariProperties);
diff --git
a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
b/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
deleted file mode 100644
index b4a32cc..0000000
---
a/ambari-server/src/test/java/org/apache/ambari/server/utils/TestParallel.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.utils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-/**
- * Tests parallel loops
- */
-public class TestParallel {
-
- /**
- * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop base cases.
- * @throws Exception
- */
- @Test
- public void testParallelForLoopBaseCases() throws Exception {
-
- ParallelLoopResult<Integer> nullLoopResult = Parallel.forLoop(
- null,
- new LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer integer) {
- return integer;
- }
- });
- Assert.assertTrue(nullLoopResult.getIsCompleted());
- Assert.assertTrue(nullLoopResult.getResult().isEmpty());
-
- ParallelLoopResult<Integer> emptyLoopResult = Parallel.forLoop(
- new ArrayList<>(),
- new LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer integer) {
- return integer;
- }
- });
- Assert.assertTrue(emptyLoopResult.getIsCompleted());
- Assert.assertTrue(emptyLoopResult.getResult().isEmpty());
-
- ParallelLoopResult<Integer> singleElementLoopResult = Parallel.forLoop(
- Collections.singletonList(7),
- new LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer integer) {
- return integer;
- }
- });
- Assert.assertTrue(singleElementLoopResult.getIsCompleted());
- List<Integer> singleElementList = singleElementLoopResult.getResult();
- Assert.assertTrue(singleElementLoopResult.getIsCompleted());
- Assert.assertFalse(singleElementList.isEmpty());
- Assert.assertEquals(1, singleElementList.size());
- Assert.assertNotNull(singleElementList.get(0));
- }
-
- /**
- * Tests Parallel.forLoop
- * @throws Exception
- */
- @Test
- public void testParallelForLoop() throws Exception {
- final List<Integer> input = new LinkedList<>();
- for(int i = 0; i < 10; i++) {
- input.add(i);
- }
-
- ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new
LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer in1) {
- return in1 * in1;
- }
- });
- Assert.assertTrue(loopResult.getIsCompleted());
- Assert.assertNotNull(loopResult.getResult());
-
- List<Integer> output = loopResult.getResult();
- Assert.assertEquals(input.size(), output.size());
- for(int i = 0; i < input.size(); i++) {
- Assert.assertEquals( i * i, (int)output.get(i));
- }
- }
-
- /**
- * Tests nested {@link org.apache.ambari.server.utils.Parallel} forLoop
- * @throws Exception
- */
- @Test
- public void testNestedParallelForLoop() throws Exception {
- final List<Integer> input = new LinkedList<>();
- for(int i = 0; i < 10; i++) {
- input.add(i);
- }
- final ParallelLoopResult<Integer>[] innerLoopResults = new
ParallelLoopResult[input.size()];
- ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new
LoopBody<Integer, Integer>() {
- @Override
- public Integer run(final Integer in1) {
- int sq = in1 * in1;
- ParallelLoopResult<Integer> innerLoopResult = Parallel.forLoop(input,
new LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer in2) {
- return in1 * in2;
- }
- });
- innerLoopResults[in1] = innerLoopResult;
- return in1 * in1;
- }
- });
- Assert.assertNotNull(loopResult);
- Assert.assertTrue(loopResult.getIsCompleted());
- List<Integer> output = loopResult.getResult();
- Assert.assertNotNull(output);
- Assert.assertEquals(input.size(), output.size());
-
- for(int i = 0; i < input.size(); i++) {
- Assert.assertEquals(i * i, (int) output.get(i));
- ParallelLoopResult<Integer> innerLoopResult = innerLoopResults[i];
- Assert.assertNotNull(innerLoopResult);
- Assert.assertTrue(innerLoopResult.getIsCompleted());
- List<Integer> innerOutput = innerLoopResult.getResult();
- Assert.assertNotNull(innerOutput);
- Assert.assertEquals(input.size(), innerOutput.size());
-
- for(int j = 0; j < input.size(); j++) {
- Assert.assertEquals(i*j, (int) innerOutput.get(j));
- }
- }
- }
-
- /**
- * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration
failures
- * @throws Exception
- */
- @Test
- public void testParallelForLoopIterationFailures() throws Exception {
- final List<Integer> input = new LinkedList<>();
- for(int i = 0; i < 10; i++) {
- input.add(i);
- }
- final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7});
- ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new
LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer in1) {
- if(failForList.contains(in1)) {
- // Return null
- return null;
- }
- return in1 * in1;
- }
- });
- Assert.assertFalse(loopResult.getIsCompleted());
- Assert.assertNotNull(loopResult.getResult());
- List<Integer> output = loopResult.getResult();
- Assert.assertEquals(input.size(), output.size());
-
- for(int i = 0; i < input.size(); i++) {
- if(failForList.contains(i)) {
- Assert.assertNull(output.get(i));
- output.set(i, i * i);
- } else {
- Assert.assertEquals(i * i, (int) output.get(i));
- }
- }
- }
-
- /**
- * Tests {@link org.apache.ambari.server.utils.Parallel} forLoop iteration
exceptions
- * @throws Exception
- */
- @Test
- public void testParallelForLoopIterationExceptions() throws Exception {
- final List<Integer> input = new LinkedList<>();
- for(int i = 0; i < 10; i++) {
- input.add(i);
- }
- final List<Integer> failForList = Arrays.asList(new Integer[] { 2, 5, 7});
- ParallelLoopResult<Integer> loopResult = Parallel.forLoop(input, new
LoopBody<Integer, Integer>() {
- @Override
- public Integer run(Integer in1) {
- if(failForList.contains(in1)) {
- throw new RuntimeException("Ignore this exception");
- }
- return in1 * in1;
- }
- });
- Assert.assertFalse(loopResult.getIsCompleted());
- Assert.assertNotNull(loopResult.getResult());
- List<Integer> output = loopResult.getResult();
- Assert.assertEquals(input.size(), output.size());
-
- for(int i = 0; i < input.size(); i++) {
- if(failForList.contains(i)) {
- Assert.assertNull(output.get(i));
- output.set(i, i * i);
- } else {
- Assert.assertEquals(i * i, (int) output.get(i));
- }
- }
- }
-}
--
To stop receiving notification emails like this one, please contact
[email protected].