http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java new file mode 100644 index 0000000..933332e --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ResourceEstimatorService.java @@ -0,0 +1,238 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.service; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Type; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.resourceestimator.common.api.RecurrenceId; +import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline; +import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration; +import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorUtil; +import org.apache.hadoop.resourceestimator.common.exception.ResourceEstimatorException; +import org.apache.hadoop.resourceestimator.common.serialization.RLESparseResourceAllocationSerDe; +import org.apache.hadoop.resourceestimator.common.serialization.ResourceSerDe; +import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException; +import org.apache.hadoop.resourceestimator.solver.api.Solver; +import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException; +import org.apache.hadoop.resourceestimator.translator.api.LogParser; +import org.apache.hadoop.resourceestimator.translator.impl.LogParserUtil; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import com.google.inject.Singleton; + +/** + * Resource Estimator Service which provides a set of REST APIs for users to + * use the estimation service. + */ +@Singleton @Path("/resourceestimator") public class ResourceEstimatorService { + private static final Logger LOGGER = + LoggerFactory.getLogger(ResourceEstimatorService.class); + private static SkylineStore skylineStore; + private static Solver solver; + private static LogParser logParser; + private static LogParserUtil logParserUtil = new LogParserUtil(); + private static Configuration config; + private static Gson gson; + private static Type rleType; + private static Type skylineStoreType; + + public ResourceEstimatorService() throws ResourceEstimatorException { + if (skylineStore == null) { + try { + config = new Configuration(); + config.addResource(ResourceEstimatorConfiguration.CONFIG_FILE); + skylineStore = ResourceEstimatorUtil.createProviderInstance(config, + ResourceEstimatorConfiguration.SKYLINESTORE_PROVIDER, + ResourceEstimatorConfiguration.DEFAULT_SKYLINESTORE_PROVIDER, + SkylineStore.class); + logParser = ResourceEstimatorUtil.createProviderInstance(config, + ResourceEstimatorConfiguration.TRANSLATOR_PROVIDER, + ResourceEstimatorConfiguration.DEFAULT_TRANSLATOR_PROVIDER, + LogParser.class); + logParser.init(config, skylineStore); + logParserUtil.setLogParser(logParser); + solver = ResourceEstimatorUtil.createProviderInstance(config, + ResourceEstimatorConfiguration.SOLVER_PROVIDER, + ResourceEstimatorConfiguration.DEFAULT_SOLVER_PROVIDER, + Solver.class); + solver.init(config, skylineStore); + } catch (Exception ex) { + LOGGER + .error("Server initialization failed due to: {}", ex.getMessage()); + throw new ResourceEstimatorException(ex.getMessage(), ex); + } + gson = new GsonBuilder() + .registerTypeAdapter(Resource.class, new ResourceSerDe()) + .registerTypeAdapter(RLESparseResourceAllocation.class, + new RLESparseResourceAllocationSerDe()) + .enableComplexMapKeySerialization().create(); + rleType = new TypeToken<RLESparseResourceAllocation>() { + }.getType(); + skylineStoreType = + new TypeToken<Map<RecurrenceId, List<ResourceSkyline>>>() { + }.getType(); + } + } + + /** + * Parse the log file. See also {@link LogParser#parseStream(InputStream)}. + * + * @param logFile file/directory of the log to be parsed. + * @throws IOException if fails to parse the log. + * @throws SkylineStoreException if fails to addHistory to + * {@link SkylineStore}. + * @throws ResourceEstimatorException if the {@link LogParser} + * is not initialized. + */ + @POST @Path("/translator/{logFile : .+}") public void parseFile( + @PathParam("logFile") String logFile) + throws IOException, SkylineStoreException, ResourceEstimatorException { + logParserUtil.parseLog(logFile); + LOGGER.debug("Parse logFile: {}.", logFile); + } + + /** + * Get predicted {code Resource} allocation for the pipeline. If the + * prediction for the pipeline already exists in the {@link SkylineStore}, it + * will directly get the prediction from {@link SkylineStore}, otherwise it + * will call the {@link Solver} to make prediction, and store the predicted + * {code Resource} allocation to the {@link SkylineStore}. Note that invoking + * {@link Solver} could be a time-consuming operation. + * + * @param pipelineId the id of the pipeline. + * @return Json format of {@link RLESparseResourceAllocation}. + * @throws SolverException if {@link Solver} fails; + * @throws SkylineStoreException if fails to get history + * {@link ResourceSkyline} or predicted {code Resource} allocation + * from {@link SkylineStore}. + */ + @GET @Path("/estimator/{pipelineId}") @Produces(MediaType.APPLICATION_JSON) + public String getPrediction( + @PathParam(value = "pipelineId") String pipelineId) + throws SolverException, SkylineStoreException { + // first, try to grab the predicted resource allocation from the skyline + // store + RLESparseResourceAllocation result = skylineStore.getEstimation(pipelineId); + // if received resource allocation is null, then run the solver + if (result == null) { + RecurrenceId recurrenceId = new RecurrenceId(pipelineId, "*"); + Map<RecurrenceId, List<ResourceSkyline>> jobHistory = + skylineStore.getHistory(recurrenceId); + result = solver.solve(jobHistory); + } + final String prediction = gson.toJson(result, rleType); + LOGGER.debug("Predict resource requests for pipelineId: {}." + pipelineId); + + return prediction; + } + + /** + * Get history {@link ResourceSkyline} from {@link SkylineStore}. This + * function supports the following special wildcard operations regarding + * {@link RecurrenceId}: If the {@code pipelineId} is "*", it will return all + * entries in the store; else, if the {@code runId} is "*", it will return all + * {@link ResourceSkyline}s belonging to the {@code pipelineId}; else, it will + * return all {@link ResourceSkyline}s belonging to the {{@code pipelineId}, + * {@code runId}}. If the {@link RecurrenceId} does not exist, it will not do + * anything. + * + * @param pipelineId pipelineId of the history run. + * @param runId runId of the history run. + * @return Json format of history {@link ResourceSkyline}s. + * @throws SkylineStoreException if fails to getHistory + * {@link ResourceSkyline} from {@link SkylineStore}. + */ + @GET @Path("/skylinestore/history/{pipelineId}/{runId}") + @Produces(MediaType.APPLICATION_JSON) + public String getHistoryResourceSkyline( + @PathParam("pipelineId") String pipelineId, + @PathParam("runId") String runId) throws SkylineStoreException { + RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId); + Map<RecurrenceId, List<ResourceSkyline>> jobHistory = + skylineStore.getHistory(recurrenceId); + final String skyline = gson.toJson(jobHistory, skylineStoreType); + LOGGER + .debug("Query the skyline store for recurrenceId: {}." + recurrenceId); + + recurrenceId = new RecurrenceId("*", "*"); + jobHistory = skylineStore.getHistory(recurrenceId); + + return skyline; + } + + /** + * Get estimated {code Resource} allocation for the pipeline. + * + * @param pipelineId id of the pipeline. + * @return Json format of {@link RLESparseResourceAllocation}. + * @throws SkylineStoreException if fails to get estimated {code Resource} + * allocation from {@link SkylineStore}. + */ + @GET @Path("/skylinestore/estimation/{pipelineId}") + @Produces(MediaType.APPLICATION_JSON) + public String getEstimatedResourceAllocation( + @PathParam("pipelineId") String pipelineId) throws SkylineStoreException { + RLESparseResourceAllocation result = skylineStore.getEstimation(pipelineId); + final String skyline = gson.toJson(result, rleType); + LOGGER.debug("Query the skyline store for pipelineId: {}." + pipelineId); + + return skyline; + } + + /** + * Delete history {@link ResourceSkyline}s from {@link SkylineStore}. + * <p> Note that for safety considerations, we only allow users to delete + * history {@link ResourceSkyline}s of one job run. + * + * @param pipelineId pipelineId of the history run. + * @param runId runId runId of the history run. + * @throws SkylineStoreException if fails to deleteHistory + * {@link ResourceSkyline}s. + */ + @DELETE @Path("/skylinestore/history/{pipelineId}/{runId}") + public void deleteHistoryResourceSkyline( + @PathParam("pipelineId") String pipelineId, + @PathParam("runId") String runId) throws SkylineStoreException { + RecurrenceId recurrenceId = new RecurrenceId(pipelineId, runId); + skylineStore.deleteHistory(recurrenceId); + LOGGER.info("Delete ResourceSkyline for recurrenceId: {}.", recurrenceId); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java new file mode 100644 index 0000000..23e1413 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/ShutdownHook.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple shutdown hook for {@link ResourceEstimatorServer}. + */ +public class ShutdownHook extends Thread { + private static final Logger LOGGER = + LoggerFactory.getLogger(ShutdownHook.class); + private final ResourceEstimatorServer server; + + ShutdownHook(ResourceEstimatorServer server) { + this.server = server; + } + + public void run() { + try { + server.shutdown(); + } catch (Exception e) { + LOGGER.error("HttpServer fails to shut down!"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java new file mode 100644 index 0000000..3571736 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/service/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Resource estimator service. + */ + +package org.apache.hadoop.resourceestimator.service; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java new file mode 100644 index 0000000..8fe4619 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/HistorySkylineStore.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.api; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.resourceestimator.common.api.RecurrenceId; +import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException; + +/** + * HistorySkylineStore stores pipeline job's {@link ResourceSkyline}s in all + * runs. {@code Estimator} will query the {@link ResourceSkyline}s for pipeline + * jobs. {@code Parser} will parse various types of job logs, construct + * {@link ResourceSkyline}s out of the logs and store them in the SkylineStore. + */ +public interface HistorySkylineStore { + /** + * Add job's resource skyline to the <em>store</em> indexed by the job's + * {@link RecurrenceId}. {@link RecurrenceId} is used to identify recurring + * pipeline jobs, and we assume that {@code + * ResourceEstimatorServer} users will provide the correct + * {@link RecurrenceId}. <p> If {@link ResourceSkyline}s to be added contain + * <em>null</em> elements, the function will skip them. + * + * @param recurrenceId the unique id of user's recurring pipeline jobs. + * @param resourceSkylines the list of {@link ResourceSkyline}s in one run. + * @throws SkylineStoreException if: (1) input parameters are invalid; (2) + * {@link ResourceSkyline}s to be added contain some duplicate + * {@link RecurrenceId}s which already exist in the + * {@link HistorySkylineStore}. + */ + void addHistory(RecurrenceId recurrenceId, + List<ResourceSkyline> resourceSkylines) throws SkylineStoreException; + + /** + * Delete all {@link ResourceSkyline}s belonging to given + * {@link RecurrenceId}. + * <p> Note that for safety considerations, we only allow users to + * deleteHistory {@link ResourceSkyline}s of one job run. + * + * @param recurrenceId the unique id of user's recurring pipeline jobs. + * @throws SkylineStoreException if: (1) input parameters are invalid; (2) + * recurrenceId does not exist in the {@link HistorySkylineStore}. + */ + void deleteHistory(RecurrenceId recurrenceId) throws SkylineStoreException; + + /** + * Update {@link RecurrenceId} with given {@link ResourceSkyline}s. This + * function will deleteHistory all the {@link ResourceSkyline}s belonging to + * the {@link RecurrenceId}, and re-insert the given {@link ResourceSkyline}s + * to the SkylineStore. + * <p> If {@link ResourceSkyline}s contain <em>null</em> elements, + * the function will skip them. + * + * @param recurrenceId the unique id of the pipeline job. + * @param resourceSkylines the list of {@link ResourceSkyline}s in one run. + * @throws SkylineStoreException if: (1) input parameters are invalid; (2) + * recurrenceId does not exist in the SkylineStore. + */ + void updateHistory(RecurrenceId recurrenceId, + List<ResourceSkyline> resourceSkylines) throws SkylineStoreException; + + /** + * Return all {@link ResourceSkyline}s belonging to {@link RecurrenceId}. + * <p> This function supports the following special wildcard operations + * regarding {@link RecurrenceId}: If the {@code pipelineId} is "*", it will + * return all entries in the store; else, if the {@code runId} is "*", it + * will return all {@link ResourceSkyline}s belonging to the + * {@code pipelineId}; else, it will return all {@link ResourceSkyline}s + * belonging to the {{@code pipelineId}, {@code runId}}. If the + * {@link RecurrenceId} does not exist, it will return <em>null</em>. + * + * @param recurrenceId the unique id of the pipeline job. + * @return all {@link ResourceSkyline}s belonging to the recurrenceId. + * @throws SkylineStoreException if recurrenceId is <em>null</em>. + */ + Map<RecurrenceId, List<ResourceSkyline>> getHistory(RecurrenceId recurrenceId) + throws SkylineStoreException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java new file mode 100644 index 0000000..c3fedce --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/PredictionSkylineStore.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.api; + +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; + +/** + * PredictionSkylineStore stores the predicted + * {@code RLESparseResourceAllocation} of a job as computed by the + * {@code Estimator} based on the {@code ResourceSkyline}s of past executions in + * the {@code HistorySkylineStore}. + */ +public interface PredictionSkylineStore { + + /** + * Add job's predicted {@code Resource} allocation to the <em>store</em> + * indexed by the {@code + * pipelineId}. + * <p> Note that right now we only keep the latest copy of predicted + * {@code Resource} allocation for the recurring pipeline. + * + * @param pipelineId the id of the recurring pipeline. + * @param resourceOverTime the predicted {@code Resource} allocation for the + * pipeline. + * @throws SkylineStoreException if input parameters are invalid. + */ + void addEstimation(String pipelineId, + RLESparseResourceAllocation resourceOverTime) + throws SkylineStoreException; + + /** + * Return the predicted {@code Resource} allocation for the pipeline. + * <p> If the pipelineId does not exist, it will return <em>null</em>. + * + * @param pipelineId the unique id of the pipeline. + * @return the predicted {@code Resource} allocation for the pipeline. + * @throws SkylineStoreException if pipelineId is <em>null</em>. + */ + RLESparseResourceAllocation getEstimation(String pipelineId) + throws SkylineStoreException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java new file mode 100644 index 0000000..f352ed4 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/SkylineStore.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.api; + +/** + * SkylineStore is composable interface for storing the history + * {@code ResourceSkyline}s of past job runs and the predicted + * {@code RLESparseResourceAllocation} for future execution. + */ +public interface SkylineStore + extends HistorySkylineStore, PredictionSkylineStore { +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java new file mode 100644 index 0000000..e833486 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/api/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * APIs for the {@code SkylineStore}. + */ + +package org.apache.hadoop.resourceestimator.skylinestore.api; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java new file mode 100644 index 0000000..7c92480 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/DuplicateRecurrenceIdException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown the {@code RecurrenceId} already exists in the + * {@code SkylineStore}. + */ +public class DuplicateRecurrenceIdException extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public DuplicateRecurrenceIdException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java new file mode 100644 index 0000000..55a8fa7 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/EmptyResourceSkylineException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown if the @link{ResourceSkyline}s to be added to the + * {@code SkylineStore} is empty. + */ +public class EmptyResourceSkylineException extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public EmptyResourceSkylineException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java new file mode 100644 index 0000000..d48be7d --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullPipelineIdException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown when pipelineId to be added is <em>null</em>. + */ +public class NullPipelineIdException extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public NullPipelineIdException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java new file mode 100644 index 0000000..9aee0b6 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRLESparseResourceAllocationException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown if the {@code ResourceSkyline} to be added is <em>null</em>. + */ +public class NullRLESparseResourceAllocationException + extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public NullRLESparseResourceAllocationException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java new file mode 100644 index 0000000..518c065 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullRecurrenceIdException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown the {@code RecurrenceId} to be added is <em>null</em>. + */ +public class NullRecurrenceIdException extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public NullRecurrenceIdException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java new file mode 100644 index 0000000..b70c764 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/NullResourceSkylineException.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown if the {@code ResourceSkyline} to be added is <em>null</em>. + */ +public class NullResourceSkylineException extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public NullResourceSkylineException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java new file mode 100644 index 0000000..b5e734d --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/RecurrenceIdNotFoundException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown if {@code RecurrenceId} is not found in the + * {@code SkylineStore}. + */ +public class RecurrenceIdNotFoundException extends SkylineStoreException { + private static final long serialVersionUID = -684069387367879218L; + + public RecurrenceIdNotFoundException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java new file mode 100644 index 0000000..751b5dd --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/SkylineStoreException.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; + +/** + * Exception thrown the @link{SkylineStore} or the {@code Estimator} tries to + * addHistory or query pipeline job's resource skylines. + */ +public abstract class SkylineStoreException extends Exception { + private static final long serialVersionUID = -684069387367879218L; + + public SkylineStoreException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java new file mode 100644 index 0000000..716e090 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/exceptions/package-info.java @@ -0,0 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * SkylineStore exception module. + */ +package org.apache.hadoop.resourceestimator.skylinestore.exceptions; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java new file mode 100644 index 0000000..e00f3a0 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/InMemoryStore.java @@ -0,0 +1,256 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.resourceestimator.common.api.RecurrenceId; +import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline; +import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.DuplicateRecurrenceIdException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.EmptyResourceSkylineException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.RecurrenceIdNotFoundException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException; +import org.apache.hadoop.resourceestimator.skylinestore.validator.SkylineStoreValidator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An in-memory implementation of {@link SkylineStore}. + */ +public class InMemoryStore implements SkylineStore { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryStore.class); + private final ReentrantReadWriteLock readWriteLock = + new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + private final SkylineStoreValidator inputValidator = + new SkylineStoreValidator(); + /** + * A pipeline job's history {@link ResourceSkyline}s. TODO: we may flatten it + * out for quick access. + */ + private final Map<RecurrenceId, List<ResourceSkyline>> skylineStore = + new HashMap<>(); // pipelineId, resource skyline + // Recurring pipeline's predicted {@link ResourceSkyline}s. + private final Map<String, RLESparseResourceAllocation> estimationStore = + new HashMap<>(); // pipelineId, ResourceSkyline + + private List<ResourceSkyline> eliminateNull( + final List<ResourceSkyline> resourceSkylines) { + final List<ResourceSkyline> result = new ArrayList<>(); + for (final ResourceSkyline resourceSkyline : resourceSkylines) { + if (resourceSkyline != null) { + result.add(resourceSkyline); + } + } + return result; + } + + @Override public final void addHistory(final RecurrenceId recurrenceId, + final List<ResourceSkyline> resourceSkylines) + throws SkylineStoreException { + inputValidator.validate(recurrenceId, resourceSkylines); + writeLock.lock(); + try { + // remove the null elements in the resourceSkylines + final List<ResourceSkyline> filteredInput = + eliminateNull(resourceSkylines); + if (filteredInput.size() > 0) { + if (skylineStore.containsKey(recurrenceId)) { + // if filteredInput has duplicate jobIds with existing skylines in the + // store, + // throw out an exception + final List<ResourceSkyline> jobHistory = + skylineStore.get(recurrenceId); + final List<String> oldJobIds = new ArrayList<>(); + for (final ResourceSkyline resourceSkyline : jobHistory) { + oldJobIds.add(resourceSkyline.getJobId()); + } + if (!oldJobIds.isEmpty()) { + for (ResourceSkyline elem : filteredInput) { + if (oldJobIds.contains(elem.getJobId())) { + StringBuilder errMsg = new StringBuilder(); + errMsg.append( + "Trying to addHistory duplicate resource skylines for " + + recurrenceId + + ". Use updateHistory function instead."); + LOGGER.error(errMsg.toString()); + throw new DuplicateRecurrenceIdException(errMsg.toString()); + } + } + } + skylineStore.get(recurrenceId).addAll(filteredInput); + LOGGER.info("Successfully addHistory new resource skylines for {}.", + recurrenceId); + } else { + skylineStore.put(recurrenceId, filteredInput); + LOGGER.info("Successfully addHistory new resource skylines for {}.", + recurrenceId); + } + } + } finally { + writeLock.unlock(); + } + } + + @Override public void addEstimation(String pipelineId, + RLESparseResourceAllocation resourceSkyline) + throws SkylineStoreException { + inputValidator.validate(pipelineId, resourceSkyline); + writeLock.lock(); + try { + estimationStore.put(pipelineId, resourceSkyline); + LOGGER.info("Successfully add estimated resource allocation for {}.", + pipelineId); + } finally { + writeLock.unlock(); + } + } + + @Override public final void deleteHistory(final RecurrenceId recurrenceId) + throws SkylineStoreException { + inputValidator.validate(recurrenceId); + writeLock.lock(); + try { + if (skylineStore.containsKey(recurrenceId)) { + skylineStore.remove(recurrenceId); + LOGGER.warn("Delete resource skylines for {}.", recurrenceId); + } else { + StringBuilder errMsg = new StringBuilder(); + errMsg.append( + "Trying to deleteHistory non-existing recurring pipeline " + + recurrenceId + "\'s resource skylines"); + LOGGER.error(errMsg.toString()); + throw new RecurrenceIdNotFoundException(errMsg.toString()); + } + } finally { + writeLock.unlock(); + } + } + + @Override public final void updateHistory(final RecurrenceId recurrenceId, + final List<ResourceSkyline> resourceSkylines) + throws SkylineStoreException { + inputValidator.validate(recurrenceId, resourceSkylines); + writeLock.lock(); + try { + if (skylineStore.containsKey(recurrenceId)) { + // remove the null elements in the resourceSkylines + List<ResourceSkyline> filteredInput = eliminateNull(resourceSkylines); + if (filteredInput.size() > 0) { + skylineStore.put(recurrenceId, filteredInput); + LOGGER.info("Successfully updateHistory resource skylines for {}.", + recurrenceId); + } else { + StringBuilder errMsg = new StringBuilder(); + errMsg.append("Trying to updateHistory " + recurrenceId + + " with empty resource skyline"); + LOGGER.error(errMsg.toString()); + throw new EmptyResourceSkylineException(errMsg.toString()); + } + } else { + StringBuilder errMsg = new StringBuilder(); + errMsg.append( + "Trying to updateHistory non-existing resource skylines for " + + recurrenceId); + LOGGER.error(errMsg.toString()); + throw new RecurrenceIdNotFoundException(errMsg.toString()); + } + } finally { + writeLock.unlock(); + } + } + + @Override public final Map<RecurrenceId, List<ResourceSkyline>> getHistory( + final RecurrenceId recurrenceId) throws SkylineStoreException { + inputValidator.validate(recurrenceId); + readLock.lock(); + try { + String pipelineId = recurrenceId.getPipelineId(); + // User tries to getHistory all resource skylines in the skylineStore + if (pipelineId.equals("*")) { + LOGGER + .info("Successfully query resource skylines for {}.", recurrenceId); + return Collections.unmodifiableMap(skylineStore); + } + String runId = recurrenceId.getRunId(); + Map<RecurrenceId, List<ResourceSkyline>> result = + new HashMap<RecurrenceId, List<ResourceSkyline>>(); + // User tries to getHistory pipelineId's all resource skylines in the + // skylineStore + if (runId.equals("*")) { + // TODO: this for loop is expensive, so we may change the type of + // skylineStore to + // speed up this loop. + for (Map.Entry<RecurrenceId, List<ResourceSkyline>> entry : skylineStore + .entrySet()) { + RecurrenceId index = entry.getKey(); + if (index.getPipelineId().equals(pipelineId)) { + result.put(index, entry.getValue()); + } + } + if (result.size() > 0) { + LOGGER.info("Successfully query resource skylines for {}.", + recurrenceId); + return Collections.unmodifiableMap(result); + } else { + LOGGER.warn( + "Trying to getHistory non-existing resource skylines for {}.", + recurrenceId); + return null; + } + } + // User tries to getHistory {pipelineId, runId}'s resource skylines + if (skylineStore.containsKey(recurrenceId)) { + result.put(recurrenceId, skylineStore.get(recurrenceId)); + } else { + LOGGER + .warn("Trying to getHistory non-existing resource skylines for {}.", + recurrenceId); + return null; + } + LOGGER.info("Successfully query resource skylines for {}.", recurrenceId); + return Collections.unmodifiableMap(result); + } finally { + readLock.unlock(); + } + } + + @Override public final RLESparseResourceAllocation getEstimation( + String pipelineId) throws SkylineStoreException { + inputValidator.validate(pipelineId); + readLock.lock(); + try { + return estimationStore.get(pipelineId); + } finally { + readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java new file mode 100644 index 0000000..ffccd5d --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/impl/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation for {@code SkylineStore}. + */ + +package org.apache.hadoop.resourceestimator.skylinestore.impl; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java new file mode 100644 index 0000000..f5f50f5 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/SkylineStoreValidator.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.skylinestore.validator; + +import java.util.List; + +import org.apache.hadoop.resourceestimator.common.api.RecurrenceId; +import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline; +import org.apache.hadoop.resourceestimator.skylinestore.api.SkylineStore; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullPipelineIdException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullRLESparseResourceAllocationException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullRecurrenceIdException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.NullResourceSkylineException; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SkylineStoreValidator validates input parameters for {@link SkylineStore}. + */ +public class SkylineStoreValidator { + private static final Logger LOGGER = + LoggerFactory.getLogger(SkylineStoreValidator.class); + + /** + * Check if recurrenceId is <em>null</em>. + * + * @param recurrenceId the id of the recurring pipeline job. + * @throws SkylineStoreException if input parameters are invalid. + */ + public final void validate(final RecurrenceId recurrenceId) + throws SkylineStoreException { + if (recurrenceId == null) { + StringBuilder sb = new StringBuilder(); + sb.append("Recurrence id is null, please try again by specifying" + + " a valid Recurrence id."); + LOGGER.error(sb.toString()); + throw new NullRecurrenceIdException(sb.toString()); + } + } + + /** + * Check if pipelineId is <em>null</em>. + * + * @param pipelineId the id of the recurring pipeline job. + * @throws SkylineStoreException if input parameters are invalid. + */ + public final void validate(final String pipelineId) + throws SkylineStoreException { + if (pipelineId == null) { + StringBuilder sb = new StringBuilder(); + sb.append("pipelineId is null, please try again by specifying" + + " a valid pipelineId."); + LOGGER.error(sb.toString()); + throw new NullPipelineIdException(sb.toString()); + } + } + + /** + * Check if recurrenceId is <em>null</em> or resourceSkylines is + * <em>null</em>. + * + * @param recurrenceId the id of the recurring pipeline job. + * @param resourceSkylines the list of {@link ResourceSkyline}s to be added. + * @throws SkylineStoreException if input parameters are invalid. + */ + public final void validate(final RecurrenceId recurrenceId, + final List<ResourceSkyline> resourceSkylines) + throws SkylineStoreException { + validate(recurrenceId); + if (resourceSkylines == null) { + StringBuilder sb = new StringBuilder(); + sb.append("ResourceSkylines for " + recurrenceId + + " is null, please try again by " + + "specifying valid ResourceSkylines."); + LOGGER.error(sb.toString()); + throw new NullResourceSkylineException(sb.toString()); + } + } + + /** + * Check if pipelineId is <em>null</em> or resourceOverTime is <em>null</em>. + * + * @param pipelineId the id of the recurring pipeline. + * @param resourceOverTime predicted {@code Resource} allocation to be added. + * @throws SkylineStoreException if input parameters are invalid. + */ + public final void validate(final String pipelineId, + final RLESparseResourceAllocation resourceOverTime) + throws SkylineStoreException { + validate(pipelineId); + if (resourceOverTime == null) { + StringBuilder sb = new StringBuilder(); + sb.append("Resource allocation for " + pipelineId + " is null."); + LOGGER.error(sb.toString()); + throw new NullRLESparseResourceAllocationException(sb.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java new file mode 100644 index 0000000..23d67c5 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/skylinestore/validator/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Validator for {@code SkylineStore}. + */ + +package org.apache.hadoop.resourceestimator.skylinestore.validator; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java new file mode 100644 index 0000000..7958a6f --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/Solver.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.solver.api; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.resourceestimator.common.api.RecurrenceId; +import org.apache.hadoop.resourceestimator.common.api.ResourceSkyline; +import org.apache.hadoop.resourceestimator.skylinestore.api.PredictionSkylineStore; +import org.apache.hadoop.resourceestimator.skylinestore.exceptions.SkylineStoreException; +import org.apache.hadoop.resourceestimator.solver.exceptions.SolverException; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; + +/** + * Solver takes recurring pipeline's {@link ResourceSkyline} history as input, + * predicts its {@link Resource} requirement at each time t for the next run, + * and translate them into {@link ResourceSkyline} which will be used to make + * recurring resource reservations. + */ +public interface Solver { + /** + * Initializing the Solver, including loading solver parameters from + * configuration file. + * + * @param config {@link Configuration} for the Solver. + * @param skylineStore the {@link PredictionSkylineStore} which stores + * predicted {@code Resource} allocations. + */ + void init(Configuration config, PredictionSkylineStore skylineStore); + + /** + * The Solver reads recurring pipeline's {@link ResourceSkyline} history, and + * precits its {@link ResourceSkyline} requirements for the next run. + * + * @param jobHistory the {@link ResourceSkyline}s of the recurring pipeline in + * previous runs. The {@link RecurrenceId} identifies one run of the + * recurring pipeline, and the list of {@link ResourceSkyline}s + * records the {@link ResourceSkyline} of each job within the pipeline. + * @return the amount of {@link Resource} requested by the pipeline for the + * next run (discretized by timeInterval). + * @throws SolverException if: (1) input is invalid; (2) the number of + * instances in the jobHistory is smaller than the minimum + * requirement; (3) solver runtime has unexpected behaviors; + * @throws SkylineStoreException if it fails to add predicted {@code Resource} + * allocation to the {@link PredictionSkylineStore}. + */ + RLESparseResourceAllocation solve( + Map<RecurrenceId, List<ResourceSkyline>> jobHistory) + throws SolverException, SkylineStoreException; + + /** + * Release the resource used by the Solver. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java new file mode 100644 index 0000000..fc8363d --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/api/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * API for {@code Solver}. + */ + +package org.apache.hadoop.resourceestimator.solver.api; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java new file mode 100644 index 0000000..ff51f5f --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidInputException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.solver.exceptions; + +/** + * Exception thrown the {@code SkylineStore} or the {@code Estimator} tries to + * addHistory or query pipeline job's resource skylines. + */ +public class InvalidInputException extends SolverException { + + private static final long serialVersionUID = -684069387367879218L; + + public InvalidInputException(final String entity, final String reason) { + super(entity + " is " + reason + ", please try again with valid " + entity); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java new file mode 100644 index 0000000..9b614b6 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/InvalidSolverException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.solver.exceptions; + +/** + * Exception thrown the @link{SkylineStore} or the {@code Estimator} tries to + * addHistory or query pipeline job's resource skylines. + */ +public class InvalidSolverException extends SolverException { + + private static final long serialVersionUID = -684069387367879218L; + + public InvalidSolverException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java new file mode 100644 index 0000000..57507ea --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/SolverException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.solver.exceptions; + +/** + * Exception thrown the @link{SkylineStore} or the {@code Estimator} tries to + * addHistory or query pipeline job's resource skylines. + */ +public abstract class SolverException extends Exception { + + private static final long serialVersionUID = -684069387367879218L; + + public SolverException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java new file mode 100644 index 0000000..bd45324 --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/exceptions/package-info.java @@ -0,0 +1,24 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Exception module. + */ +package org.apache.hadoop.resourceestimator.solver.exceptions; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9897538a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java new file mode 100644 index 0000000..55abb1c --- /dev/null +++ b/hadoop-tools/hadoop-resourceestimator/src/main/java/org/apache/hadoop/resourceestimator/solver/impl/BaseSolver.java @@ -0,0 +1,94 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.resourceestimator.solver.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.resourceestimator.common.config.ResourceEstimatorConfiguration; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; + +/** + * Common functions shared by {@code Solver} (translate predicted resource + * allocation into Hadoop's {@link ReservationSubmissionRequest}. + */ +public abstract class BaseSolver { + /** + * Used to generate {@link ReservationId}. + */ + private static final Random RAND = new Random(); + + /** + * Translate the estimated {@link Resource} requirements of the pipeline to + * Hadoop's {@link ReservationSubmissionRequest}. + * + * @param containerSpec the {@link Resource} to be allocated to each + * container; + * @param containerRequests the predicted {@link Resource} to be allocated to + * the job in each discrete time intervals; + * @param config configuration file for BaseSolver. + * @return {@link ReservationSubmissionRequest} to be submitted to Hadoop to + * make recurring resource reservation for the pipeline. + */ + public final ReservationSubmissionRequest toRecurringRDL( + final Resource containerSpec, + final RLESparseResourceAllocation containerRequests, + final Configuration config) { + final int timeInterval = + config.getInt(ResourceEstimatorConfiguration.TIME_INTERVAL_KEY, 5); + long pipelineSubmissionTime = containerRequests.getEarliestStartTime(); + long pipelineFinishTime = containerRequests.getLatestNonNullTime(); + final long containerMemAlloc = containerSpec.getMemorySize(); + final long jobLen = + (pipelineFinishTime - pipelineSubmissionTime) / timeInterval; + List<ReservationRequest> reservationRequestList = new ArrayList<>(); + for (int i = 0; i < jobLen; i++) { + // container spec, # of containers, concurrency, duration + ReservationRequest reservationRequest = ReservationRequest + .newInstance(containerSpec, (int) ( + containerRequests.getCapacityAtTime(i * timeInterval) + .getMemorySize() / containerMemAlloc), 1, timeInterval); + reservationRequestList.add(reservationRequest); + } + ReservationRequests reservationRequests = ReservationRequests + .newInstance(reservationRequestList, + ReservationRequestInterpreter.R_ALL); + ReservationDefinition reservationDefinition = ReservationDefinition + .newInstance(pipelineSubmissionTime, pipelineFinishTime, + reservationRequests, "LpSolver#toRecurringRDL"); + ReservationId reservationId = + ReservationId.newInstance(RAND.nextLong(), RAND.nextLong()); + ReservationSubmissionRequest reservationSubmissionRequest = + ReservationSubmissionRequest + .newInstance(reservationDefinition, "resourceestimator", + reservationId); + return reservationSubmissionRequest; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org