[REEF-128] Replace the protocol buffer use in the runtime API with POJOs The POJOs are directly translated from protocol buffers to POJOs with Builders. By removing the protocol buffer translation, the POJOs should be able to be used more flexibly in the future, e.g. to resolve REEF-69.
JIRA: [REEF-128](https://issues.apache.org/jira/browse/REEF-128) Pull Request: This closes #145 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/089be44d Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/089be44d Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/089be44d Branch: refs/heads/master Commit: 089be44d5e3704e73ed39648520fc662f93e740e Parents: 7396f31 Author: Brian Cho <[email protected]> Authored: Wed Apr 15 20:51:37 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Tue Apr 21 22:47:58 2015 -0700 ---------------------------------------------------------------------- lang/java/reef-common/pom.xml | 1 - .../common/client/JobSubmissionHelper.java | 26 +-- .../common/client/REEFImplementation.java | 10 +- .../common/client/api/JobSubmissionEvent.java | 81 ++++++++ .../client/api/JobSubmissionEventImpl.java | 198 +++++++++++++++++++ .../common/client/api/JobSubmissionHandler.java | 3 +- .../common/driver/EvaluatorRequestorImpl.java | 4 +- .../common/driver/api/ResourceLaunchEvent.java | 63 ++++++ .../driver/api/ResourceLaunchEventImpl.java | 134 +++++++++++++ .../driver/api/ResourceLaunchHandler.java | 3 +- .../common/driver/api/ResourceReleaseEvent.java | 37 ++++ .../driver/api/ResourceReleaseEventImpl.java | 64 ++++++ .../driver/api/ResourceReleaseHandler.java | 3 +- .../common/driver/api/ResourceRequestEvent.java | 74 +++++++ .../driver/api/ResourceRequestEventImpl.java | 178 +++++++++++++++++ .../driver/api/ResourceRequestHandler.java | 3 +- .../common/driver/api/RuntimeParameters.java | 13 +- .../driver/catalog/ResourceCatalogImpl.java | 6 +- .../evaluator/AllocatedEvaluatorImpl.java | 36 ++-- .../driver/evaluator/EvaluatorManager.java | 30 +-- .../evaluator/EvaluatorManagerFactory.java | 25 +-- .../common/driver/evaluator/Evaluators.java | 4 +- .../resourcemanager/NodeDescriptorEvent.java | 59 ++++++ .../NodeDescriptorEventImpl.java | 127 ++++++++++++ .../resourcemanager/NodeDescriptorHandler.java | 5 +- .../ResourceAllocationEvent.java | 54 +++++ .../ResourceAllocationEventImpl.java | 111 +++++++++++ .../ResourceAllocationHandler.java | 5 +- .../resourcemanager/ResourceManagerStatus.java | 35 ++-- .../resourcemanager/ResourceStatusEvent.java | 59 ++++++ .../ResourceStatusEventImpl.java | 128 ++++++++++++ .../resourcemanager/ResourceStatusHandler.java | 21 +- .../resourcemanager/RuntimeStatusEvent.java | 61 ++++++ .../resourcemanager/RuntimeStatusEventImpl.java | 132 +++++++++++++ .../reef/runtime/common/files/FileResource.java | 45 +++++ .../runtime/common/files/FileResourceImpl.java | 94 +++++++++ .../reef/runtime/common/files/FileType.java | 31 +++ .../reef/runtime/common/files/JobJarMaker.java | 15 +- .../reef/runtime/common/launch/ProcessType.java | 27 +++ .../runtime/common/launch/REEFMessageCodec.java | 8 +- .../java/org/apache/reef/util/BuilderUtils.java | 35 ++++ .../src/main/proto/client_runtime.proto | 16 -- .../src/main/proto/driver_runtime.proto | 89 --------- .../src/main/proto/reef_protocol.proto | 3 +- .../src/main/proto/reef_service_protos.proto | 25 --- .../driver/EvaluatorRequestorImplTest.java | 18 +- .../common/driver/catalog/CatalogTest.java | 4 +- .../client/HDInsightJobSubmissionHandler.java | 44 ++--- .../reef/runtime/local/client/DriverFiles.java | 17 +- .../local/client/LocalJobSubmissionHandler.java | 13 +- .../runtime/local/driver/ContainerManager.java | 10 +- .../driver/LocalResourceLaunchHandler.java | 4 +- .../driver/LocalResourceReleaseHandler.java | 4 +- .../driver/LocalResourceRequestHandler.java | 4 +- .../runtime/local/driver/ResourceManager.java | 54 ++--- .../runtime/local/driver/ResourceRequest.java | 12 +- .../local/driver/ResourceRequestQueue.java | 4 +- .../process/ReefRunnableProcessObserver.java | 15 +- .../local/driver/ResourceRequestQueueTest.java | 4 +- .../local/driver/ResourceRequestTest.java | 4 +- .../mesos/client/MesosJobSubmissionHandler.java | 24 +-- .../driver/MesosResourceLaunchHandler.java | 20 +- .../driver/MesosResourceReleaseHandler.java | 6 +- .../driver/MesosResourceRequestHandler.java | 6 +- .../runtime/mesos/driver/REEFEventHandlers.java | 32 +-- .../runtime/mesos/driver/REEFScheduler.java | 119 +++++------ .../yarn/client/YarnJobSubmissionHandler.java | 45 ++--- .../yarn/driver/EvaluatorSetupHelper.java | 16 +- .../runtime/yarn/driver/REEFEventHandlers.java | 45 +++-- .../yarn/driver/YARNResourceLaunchHandler.java | 22 +-- .../yarn/driver/YARNResourceReleaseHandler.java | 6 +- .../yarn/driver/YarnContainerManager.java | 35 ++-- .../yarn/driver/YarnResourceRequestHandler.java | 34 ++-- 73 files changed, 2230 insertions(+), 572 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/pom.xml b/lang/java/reef-common/pom.xml index 6768a4b..c6efe04 100644 --- a/lang/java/reef-common/pom.xml +++ b/lang/java/reef-common/pom.xml @@ -58,7 +58,6 @@ under the License. <arg value="src/main/proto/reef_service_protos.proto"/> <arg value="src/main/proto/evaluator_runtime.proto"/> <arg value="src/main/proto/client_runtime.proto"/> - <arg value="src/main/proto/driver_runtime.proto"/> <arg value="src/main/proto/reef_protocol.proto"/> </exec> </tasks> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java index 6b89c09..964ae8f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/JobSubmissionHelper.java @@ -19,8 +19,10 @@ package org.apache.reef.runtime.common.client; import org.apache.reef.driver.parameters.*; -import org.apache.reef.proto.ClientRuntimeProtocol; -import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.client.api.JobSubmissionEventImpl; +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.runtime.common.files.FileResourceImpl; +import org.apache.reef.runtime.common.files.FileType; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; @@ -71,34 +73,34 @@ final class JobSubmissionHelper { * @throws InjectionException * @throws IOException */ - final ClientRuntimeProtocol.JobSubmissionProto.Builder getJobsubmissionProto(final Configuration driverConfiguration) throws InjectionException, IOException { + final JobSubmissionEventImpl.Builder getJobSubmissionBuilder(final Configuration driverConfiguration) throws InjectionException, IOException { final Injector injector = Tang.Factory.getTang().newInjector(driverConfiguration); - final ClientRuntimeProtocol.JobSubmissionProto.Builder jbuilder = ClientRuntimeProtocol.JobSubmissionProto.newBuilder() + final JobSubmissionEventImpl.Builder jbuilder = JobSubmissionEventImpl.newBuilder() .setIdentifier(returnOrGenerateDriverId(injector.getNamedInstance(DriverIdentifier.class))) .setDriverMemory(injector.getNamedInstance(DriverMemory.class)) .setUserName(System.getProperty("user.name")) - .setConfiguration(configurationSerializer.toString(driverConfiguration)); + .setConfiguration(driverConfiguration); for (final String globalFileName : injector.getNamedInstance(JobGlobalFiles.class)) { LOG.log(Level.FINEST, "Adding global file: {0}", globalFileName); - jbuilder.addGlobalFile(getFileResourceProto(globalFileName, ReefServiceProtos.FileType.PLAIN)); + jbuilder.addGlobalFile(getFileResourceProto(globalFileName, FileType.PLAIN)); } for (final String globalLibraryName : injector.getNamedInstance(JobGlobalLibraries.class)) { LOG.log(Level.FINEST, "Adding global library: {0}", globalLibraryName); - jbuilder.addGlobalFile(getFileResourceProto(globalLibraryName, ReefServiceProtos.FileType.LIB)); + jbuilder.addGlobalFile(getFileResourceProto(globalLibraryName, FileType.LIB)); } for (final String localFileName : injector.getNamedInstance(DriverLocalFiles.class)) { LOG.log(Level.FINEST, "Adding local file: {0}", localFileName); - jbuilder.addLocalFile(getFileResourceProto(localFileName, ReefServiceProtos.FileType.PLAIN)); + jbuilder.addLocalFile(getFileResourceProto(localFileName, FileType.PLAIN)); } for (final String localLibraryName : injector.getNamedInstance(DriverLocalLibraries.class)) { LOG.log(Level.FINEST, "Adding local library: {0}", localLibraryName); - jbuilder.addLocalFile(getFileResourceProto(localLibraryName, ReefServiceProtos.FileType.LIB)); + jbuilder.addLocalFile(getFileResourceProto(localLibraryName, FileType.LIB)); } return jbuilder; @@ -130,7 +132,7 @@ final class JobSubmissionHelper { * @return * @throws IOException */ - private static ReefServiceProtos.FileResourceProto getFileResourceProto(final String fileName, final ReefServiceProtos.FileType type) throws IOException { + private static FileResource getFileResourceProto(final String fileName, final FileType type) throws IOException { File file = new File(fileName); if (file.exists()) { // It is a local file and can be added. @@ -138,7 +140,7 @@ final class JobSubmissionHelper { // If it is a directory, create a JAR file of it and add that instead. file = toJar(file); } - return ReefServiceProtos.FileResourceProto.newBuilder() + return FileResourceImpl.newBuilder() .setName(file.getName()) .setPath(file.getPath()) .setType(type) @@ -150,7 +152,7 @@ final class JobSubmissionHelper { final URI uri = new URI(fileName); final String path = uri.getPath(); final String name = path.substring(path.lastIndexOf('/') + 1); - return ReefServiceProtos.FileResourceProto.newBuilder() + return FileResourceImpl.newBuilder() .setName(name) .setPath(uri.toString()) .setType(type) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java index 42bf1fd..3dad568 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java @@ -23,12 +23,12 @@ import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.client.REEF; import org.apache.reef.client.parameters.DriverConfigurationProviders; -import org.apache.reef.tang.ConfigurationProvider; -import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.ConfigurationBuilder; +import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; @@ -93,14 +93,14 @@ public final class REEFImplementation implements REEF { public void submit(final Configuration driverConf) { try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) { final Configuration driverConfiguration = createDriverConfiguration(driverConf); - final JobSubmissionProto submissionMessage; + final JobSubmissionEvent submissionMessage; try { if (this.clientWireUp.isClientPresent()) { - submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConfiguration) + submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConf) .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier()) .build(); } else { - submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConfiguration) + submissionMessage = this.jobSubmissionHelper.getJobSubmissionBuilder(driverConf) .setRemoteId(ErrorHandlerRID.NONE) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java new file mode 100644 index 0000000..ac45b05 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEvent.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.client.api; + +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +import java.util.Set; + +/** + * Event sent to Driver Runtime + * Submission of a Job to the Driver Runtime + */ +@RuntimeAuthor +@DefaultImplementation(JobSubmissionEventImpl.class) +public interface JobSubmissionEvent { + + /** + * @return Id of the Job + */ + String getIdentifier(); + + /** + * @return Remote Id for the error handler + */ + String getRemoteId(); + + /** + * @return Driver configuration + */ + Configuration getConfiguration(); + + /** + * @return Owner's username + */ + String getUserName(); + + /** + * @return List of global files + */ + Set<FileResource> getGlobalFileSet(); + + /** + * @return List of local files + */ + Set<FileResource> getLocalFileSet(); + + /** + * @return Memory to be allocated to the Driver + */ + Optional<Integer> getDriverMemory(); + + /** + * @return Priority to be given to the Job + */ + Optional<Integer> getPriority(); + + /** + * @return Queue to submit the Job to + */ + Optional<String> getQueue(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java new file mode 100644 index 0000000..e76f554 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionEventImpl.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.client.api; + +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +import java.util.HashSet; +import java.util.Set; + +/** + * Default POJO implementation of JobSubmissionEvent. + * Use newBuilder to construct an instance. + */ +public final class JobSubmissionEventImpl implements JobSubmissionEvent { + private final String identifier; + private final String remoteId; + private final Configuration configuration; + private final String userName; + private final Set<FileResource> globalFileSet; + private final Set<FileResource> localFileSet; + private final Optional<Integer> driverMemory; + private final Optional<Integer> priority; + private final Optional<String> queue; + + private JobSubmissionEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + this.remoteId = BuilderUtils.notNull(builder.remoteId); + this.configuration = BuilderUtils.notNull(builder.configuration); + this.userName = BuilderUtils.notNull(builder.userName); + this.globalFileSet = BuilderUtils.notNull(builder.globalFileSet); + this.localFileSet = BuilderUtils.notNull(builder.localFileSet); + this.driverMemory = Optional.ofNullable(builder.driverMemory); + this.priority = Optional.ofNullable(builder.priority); + this.queue = Optional.ofNullable(builder.queue); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getRemoteId() { + return remoteId; + } + + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public String getUserName() { + return userName; + } + + @Override + public Set<FileResource> getGlobalFileSet() { + return globalFileSet; + } + + @Override + public Set<FileResource> getLocalFileSet() { + return localFileSet; + } + + @Override + public Optional<Integer> getDriverMemory() { + return driverMemory; + } + + @Override + public Optional<Integer> getPriority() { + return priority; + } + + @Override + public Optional<String> getQueue() { + return queue; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create JobSubmissionEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<JobSubmissionEvent> { + private String identifier; + private String remoteId; + private Configuration configuration; + private String userName; + private Set<FileResource> globalFileSet = new HashSet<>(); + private Set<FileResource> localFileSet = new HashSet<>(); + private Integer driverMemory; + private Integer priority; + private String queue; + + /** + * @see JobSubmissionEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + /** + * @see JobSubmissionEvent#getRemoteId() + */ + public Builder setRemoteId(final String remoteId) { + this.remoteId = remoteId; + return this; + } + + /** + * @see JobSubmissionEvent#getConfiguration() + */ + public Builder setConfiguration(final Configuration configuration) { + this.configuration = configuration; + return this; + } + + /** + * @see JobSubmissionEvent#getUserName() + */ + public Builder setUserName(final String userName) { + this.userName = userName; + return this; + } + + /** + * Add an entry to the globalFileSet + * @see JobSubmissionEvent#getGlobalFileSet() + */ + public Builder addGlobalFile(final FileResource globalFile) { + this.globalFileSet.add(globalFile); + return this; + } + + /** + * Add an entry to the localFileSet + * @see JobSubmissionEvent#getLocalFileSet() + */ + public Builder addLocalFile(final FileResource localFile) { + this.localFileSet.add(localFile); + return this; + } + + /** + * @see JobSubmissionEvent#getDriverMemory() + */ + public Builder setDriverMemory(final Integer driverMemory) { + this.driverMemory = driverMemory; + return this; + } + + /** + * @see JobSubmissionEvent#getPriority() + */ + public Builder setPriority(final Integer priority) { + this.priority = priority; + return this; + } + + /** + * @see JobSubmissionEvent#getQueue() + */ + public Builder setQueue(final String queue) { + this.queue = queue; + return this; + } + + @Override + public JobSubmissionEvent build() { + return new JobSubmissionEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java index af2d064..ae80ba0 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/client/api/JobSubmissionHandler.java @@ -19,11 +19,10 @@ package org.apache.reef.runtime.common.client.api; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.ClientRuntimeProtocol.JobSubmissionProto; import org.apache.reef.wake.EventHandler; @RuntimeAuthor -public interface JobSubmissionHandler extends EventHandler<JobSubmissionProto>, AutoCloseable { +public interface JobSubmissionHandler extends EventHandler<JobSubmissionEvent>, AutoCloseable { @Override public void close(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java index 940de3c..bff27ab 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java @@ -23,7 +23,7 @@ import org.apache.reef.driver.catalog.RackDescriptor; import org.apache.reef.driver.catalog.ResourceCatalog; import org.apache.reef.driver.evaluator.EvaluatorRequest; import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl; import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; import org.apache.reef.util.logging.LoggingScope; import org.apache.reef.util.logging.LoggingScopeFactory; @@ -72,7 +72,7 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor { } try (LoggingScope ls = loggingScopeFactory.evaluatorSubmit(req.getNumber())) { - final DriverRuntimeProtocol.ResourceRequestProto.Builder request = DriverRuntimeProtocol.ResourceRequestProto + final ResourceRequestEventImpl.Builder request = ResourceRequestEventImpl .newBuilder() .setResourceCount(req.getNumber()) .setVirtualCores(req.getNumberOfCores()) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEvent.java new file mode 100644 index 0000000..8a04810 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEvent.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.api; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.runtime.common.launch.ProcessType; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.DefaultImplementation; + +import java.util.Set; + +/** + * Event from Driver Process -> Driver Runtime + * A request to the Driver Runtime to launch an Evaluator on the allocated Resource + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(ResourceLaunchEventImpl.class) +public interface ResourceLaunchEvent { + + /** + * @return Id of the resource to launch the Evaluator on + */ + String getIdentifier(); + + /** + * @return Remote Id for the error handler + */ + String getRemoteId(); + + /** + * @return Evaluator configuration + */ + Configuration getEvaluatorConf(); + + /** + * @return Type of process to launch + */ + ProcessType getType(); + + /** + * @return List of libraries local to this Evaluator + */ + Set<FileResource> getFileSet(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEventImpl.java new file mode 100644 index 0000000..27a024b --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchEventImpl.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.api; + +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.runtime.common.launch.ProcessType; +import org.apache.reef.tang.Configuration; +import org.apache.reef.util.BuilderUtils; + +import java.util.HashSet; +import java.util.Set; + +/** + * Default POJO implementation of ResourceLaunchEvent. + * Use newBuilder to construct an instance. + */ +public final class ResourceLaunchEventImpl implements ResourceLaunchEvent { + + private final String identifier; + private final String remoteId; + private final Configuration evaluatorConf; + private final ProcessType type; + private final Set<FileResource> fileSet; + + private ResourceLaunchEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + this.remoteId = BuilderUtils.notNull(builder.remoteId); + this.evaluatorConf = BuilderUtils.notNull(builder.evaluatorConf); + this.type = BuilderUtils.notNull(builder.type); + this.fileSet = BuilderUtils.notNull(builder.fileSet); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getRemoteId() { + return remoteId; + } + + @Override + public Configuration getEvaluatorConf() { + return evaluatorConf; + } + + @Override + public ProcessType getType() { + return type; + } + + @Override + public Set<FileResource> getFileSet() { + return fileSet; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create ResourceLaunchEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<ResourceLaunchEvent> { + private String identifier; + private String remoteId; + private Configuration evaluatorConf; + private ProcessType type; + private Set<FileResource> fileSet = new HashSet<>(); + + /** + * @see ResourceLaunchEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + /** + * @see ResourceLaunchEvent#getRemoteId() + */ + public Builder setRemoteId(final String remoteId) { + this.remoteId = remoteId; + return this; + } + + /** + * @see ResourceLaunchEvent#getEvaluatorConf() + */ + public Builder setEvaluatorConf(final Configuration evaluatorConf) { + this.evaluatorConf = evaluatorConf; + return this; + } + + /** + * @see ResourceLaunchEvent#getType() + */ + public Builder setType(final ProcessType type) { + this.type = type; + return this; + } + + /** + * Add an entry to the fileSet + * @see ResourceLaunchEvent#getFileSet() + */ + public Builder addFile(final FileResource file) { + this.fileSet.add(file); + return this; + } + + @Override + public ResourceLaunchEvent build() { + return new ResourceLaunchEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java index 65c3830..d1e2f62 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceLaunchHandler.java @@ -19,12 +19,11 @@ package org.apache.reef.runtime.common.driver.api; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.wake.EventHandler; /** * */ @RuntimeAuthor -public interface ResourceLaunchHandler extends EventHandler<DriverRuntimeProtocol.ResourceLaunchProto> { +public interface ResourceLaunchHandler extends EventHandler<ResourceLaunchEvent> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEvent.java new file mode 100644 index 0000000..d46aefa --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.api; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * Event from Driver Process -> Driver Runtime + * A request to the Driver Runtime to release this resource + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(ResourceReleaseEventImpl.class) +public interface ResourceReleaseEvent { + /** + * @return Id of the resource to release + */ + String getIdentifier(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEventImpl.java new file mode 100644 index 0000000..667d9a3 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseEventImpl.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.api; + +import org.apache.reef.util.BuilderUtils; + +/** + * Default POJO implementation of ResourceReleaseEvent. + * Use newBuilder to construct an instance. + */ +public final class ResourceReleaseEventImpl implements ResourceReleaseEvent { + + private final String identifier; + + private ResourceReleaseEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + } + + @Override + public String getIdentifier() { + return identifier; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create ResourceReleaseEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<ResourceReleaseEvent> { + + private String identifier; + + /** + * @see ResourceReleaseEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + @Override + public ResourceReleaseEvent build() { + return new ResourceReleaseEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java index 581eb63..8d55c50 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceReleaseHandler.java @@ -19,12 +19,11 @@ package org.apache.reef.runtime.common.driver.api; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.wake.EventHandler; /** * */ @RuntimeAuthor -public interface ResourceReleaseHandler extends EventHandler<DriverRuntimeProtocol.ResourceReleaseProto> { +public interface ResourceReleaseHandler extends EventHandler<ResourceReleaseEvent> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java new file mode 100644 index 0000000..c843219 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.api; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +import java.util.List; + +/** + * Event from Driver Process -> Driver Runtime + * A request to the Driver Runtime to allocate resources with the given specification + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(ResourceRequestEventImpl.class) +public interface ResourceRequestEvent { + + /** + * @return The number of resources requested + */ + int getResourceCount(); + + /** + * @return A list of preferred nodes to place the resource on. + * An empty list indicates all nodes are equally preferred. + */ + List<String> getNodeNameList(); + + /** + * @return A list of preferred racks to place the resource on, + * An empty list indicates all racks are equally preferred. + */ + List<String> getRackNameList(); + + /** + * @return The amount of memory to allocate to the resource + */ + Optional<Integer> getMemorySize(); + + /** + * @return The priority assigned to the resource + */ + Optional<Integer> getPriority(); + + /** + * @return The number of virtual CPU cores to allocate for the resource + */ + Optional<Integer> getVirtualCores(); + + /** + * @return If true, allow allocation on nodes and racks other than + * the preferred list. If false, strictly enforce the preferences. + */ + Optional<Boolean> getRelaxLocality(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java new file mode 100644 index 0000000..1605118 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.api; + +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +import java.util.ArrayList; +import java.util.List; + +/** + * Default POJO implementation of ResourceRequestEvent. + * Use newBuilder to construct an instance. + */ +public final class ResourceRequestEventImpl implements ResourceRequestEvent { + private final int resourceCount; + private final List<String> nodeNameList; + private final List<String> rackNameList; + private final Optional<Integer> memorySize; + private final Optional<Integer> priority; + private final Optional<Integer> virtualCores; + private final Optional<Boolean> relaxLocality; + + private ResourceRequestEventImpl(final Builder builder) { + this.resourceCount = BuilderUtils.notNull(builder.resourceCount); + this.nodeNameList = BuilderUtils.notNull(builder.nodeNameList); + this.rackNameList = BuilderUtils.notNull(builder.rackNameList); + this.memorySize = Optional.ofNullable(builder.memorySize); + this.priority = Optional.ofNullable(builder.priority); + this.virtualCores = Optional.ofNullable(builder.virtualCores); + this.relaxLocality = Optional.ofNullable(builder.relaxLocality); + } + + @Override + public int getResourceCount() { + return resourceCount; + } + + @Override + public List<String> getNodeNameList() { + return nodeNameList; + } + + @Override + public List<String> getRackNameList() { + return rackNameList; + } + + @Override + public Optional<Integer> getMemorySize() { + return memorySize; + } + + @Override + public Optional<Integer> getPriority() { + return priority; + } + + @Override + public Optional<Integer> getVirtualCores() { + return virtualCores; + } + + @Override + public Optional<Boolean> getRelaxLocality() { + return relaxLocality; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create ResourceRequestEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<ResourceRequestEvent> { + private Integer resourceCount; + private List<String> nodeNameList = new ArrayList<>(); + private List<String> rackNameList = new ArrayList<>(); + private Integer memorySize; + private Integer priority; + private Integer virtualCores; + private Boolean relaxLocality; + + /** + * Create a builder from an existing ResourceRequestEvent + */ + public Builder mergeFrom(final ResourceRequestEvent resourceRequestEvent) { + this.resourceCount = resourceRequestEvent.getResourceCount(); + this.nodeNameList = resourceRequestEvent.getNodeNameList(); + this.rackNameList = resourceRequestEvent.getRackNameList(); + this.memorySize = resourceRequestEvent.getMemorySize().orElse(null); + this.priority = resourceRequestEvent.getPriority().orElse(null); + this.virtualCores = resourceRequestEvent.getVirtualCores().orElse(null); + this.relaxLocality = resourceRequestEvent.getRelaxLocality().orElse(null); + return this; + } + + /** + * @see ResourceRequestEvent#getResourceCount() + */ + public Builder setResourceCount(final int resourceCount) { + this.resourceCount = resourceCount; + return this; + } + + /** + * Add an entry to the nodeNameList + * @see ResourceRequestEvent#getNodeNameList() + */ + public Builder addNodeName(final String nodeName) { + this.nodeNameList.add(nodeName); + return this; + } + + /** + * Add an entry to rackNameList + * @see ResourceRequestEvent#getRackNameList() + */ + public Builder addRackName(final String rackName) { + this.rackNameList.add(rackName); + return this; + } + + /** + * @see ResourceRequestEvent#getMemorySize() + */ + public Builder setMemorySize(final int memorySize) { + this.memorySize = memorySize; + return this; + } + + /** + * @see ResourceRequestEvent#getPriority() + */ + public Builder setPriority(final int priority) { + this.priority = priority; + return this; + } + + /** + * @see ResourceRequestEvent#getVirtualCores() + */ + public Builder setVirtualCores(final int virtualCores) { + this.virtualCores = virtualCores; + return this; + } + + /** + * @see ResourceRequestEvent#getRelaxLocality() + */ + public Builder setRelaxLocality(final boolean relaxLocality) { + this.relaxLocality = relaxLocality; + return this; + } + + @Override + public ResourceRequestEvent build() { + return new ResourceRequestEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java index 3d30ec8..6faf938 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestHandler.java @@ -19,12 +19,11 @@ package org.apache.reef.runtime.common.driver.api; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.wake.EventHandler; /** * The evaluator request handler. */ @RuntimeAuthor -public interface ResourceRequestHandler extends EventHandler<DriverRuntimeProtocol.ResourceRequestProto> { +public interface ResourceRequestHandler extends EventHandler<ResourceRequestEvent> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java index e847249..9f45b4f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/RuntimeParameters.java @@ -19,7 +19,10 @@ package org.apache.reef.runtime.common.driver.api; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; import org.apache.reef.tang.annotations.Name; import org.apache.reef.tang.annotations.NamedParameter; import org.apache.reef.wake.EventHandler; @@ -32,19 +35,19 @@ import org.apache.reef.wake.EventHandler; public final class RuntimeParameters { @NamedParameter(doc = "The resource allocation handler that stub runtimes send along allocated resources e.g., containers.") - public final static class ResourceAllocationHandler implements Name<EventHandler<DriverRuntimeProtocol.ResourceAllocationProto>> { + public final static class ResourceAllocationHandler implements Name<EventHandler<ResourceAllocationEvent>> { } @NamedParameter(doc = "The node descriptor handler that stub runtimes send along node information.") - public final static class NodeDescriptorHandler implements Name<EventHandler<DriverRuntimeProtocol.NodeDescriptorProto>> { + public final static class NodeDescriptorHandler implements Name<EventHandler<NodeDescriptorEvent>> { } @NamedParameter(doc = "The resource status handler.") - public final static class ResourceStatusHandler implements Name<EventHandler<DriverRuntimeProtocol.ResourceStatusProto>> { + public final static class ResourceStatusHandler implements Name<EventHandler<ResourceStatusEvent>> { } @NamedParameter(doc = "The resourcemanager status handler.") - public final static class RuntimeStatusHandler implements Name<EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>> { + public final static class RuntimeStatusHandler implements Name<EventHandler<RuntimeStatusEvent>> { } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java index 6193af4..b01d652 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java @@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.catalog.NodeDescriptor; import org.apache.reef.driver.catalog.RackDescriptor; import org.apache.reef.driver.catalog.ResourceCatalog; -import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; import javax.inject.Inject; import java.net.InetSocketAddress; @@ -68,8 +68,8 @@ public final class ResourceCatalogImpl implements ResourceCatalog { return this.nodes.get(id); } - public synchronized final void handle(final NodeDescriptorProto node) { - final String rack_name = (node.hasRackName() ? node.getRackName() : DEFAULT_RACK); + public synchronized final void handle(final NodeDescriptorEvent node) { + final String rack_name = node.getRackName().orElse(DEFAULT_RACK); LOG.log(Level.FINEST, "Catalog new node: id[{0}], rack[{1}], host[{2}], port[{3}], memory[{4}]", new Object[]{node.getIdentifier(), rack_name, node.getHostName(), node.getPort(), http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java index d96d369..bce6aae 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java @@ -25,9 +25,11 @@ import org.apache.reef.driver.context.ContextConfiguration; import org.apache.reef.driver.evaluator.AllocatedEvaluator; import org.apache.reef.driver.evaluator.EvaluatorDescriptor; import org.apache.reef.driver.evaluator.EvaluatorType; -import org.apache.reef.proto.DriverRuntimeProtocol; -import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl; import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration; +import org.apache.reef.runtime.common.files.FileResourceImpl; +import org.apache.reef.runtime.common.files.FileType; +import org.apache.reef.runtime.common.launch.ProcessType; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.ConfigurationBuilder; import org.apache.reef.tang.Tang; @@ -184,35 +186,35 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { evaluatorConfiguration = contextConfigurationModule.build(); } - final DriverRuntimeProtocol.ResourceLaunchProto.Builder rbuilder = - DriverRuntimeProtocol.ResourceLaunchProto.newBuilder() + final ResourceLaunchEventImpl.Builder rbuilder = + ResourceLaunchEventImpl.newBuilder() .setIdentifier(this.evaluatorManager.getId()) .setRemoteId(this.remoteID) - .setEvaluatorConf(configurationSerializer.toString(evaluatorConfiguration)); + .setEvaluatorConf(evaluatorConfiguration); for (final File file : this.files) { - rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder() - .setName(file.getName()) - .setPath(file.getPath()) - .setType(ReefServiceProtos.FileType.PLAIN) - .build()); + rbuilder.addFile(FileResourceImpl.newBuilder() + .setName(file.getName()) + .setPath(file.getPath()) + .setType(FileType.PLAIN) + .build()); } for (final File lib : this.libraries) { - rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder() - .setName(lib.getName()) - .setPath(lib.getPath().toString()) - .setType(ReefServiceProtos.FileType.LIB) - .build()); + rbuilder.addFile(FileResourceImpl.newBuilder() + .setName(lib.getName()) + .setPath(lib.getPath().toString()) + .setType(FileType.LIB) + .build()); } { // Set the type switch (this.evaluatorManager.getEvaluatorDescriptor().getType()) { case CLR: - rbuilder.setType(ReefServiceProtos.ProcessType.CLR); + rbuilder.setType(ProcessType.CLR); break; default: - rbuilder.setType(ReefServiceProtos.ProcessType.JVM); + rbuilder.setType(ProcessType.JVM); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index ce148b8..0c2edb0 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -31,16 +31,18 @@ import org.apache.reef.driver.task.FailedTask; import org.apache.reef.exception.EvaluatorException; import org.apache.reef.exception.EvaluatorKilledByResourceManagerException; import org.apache.reef.io.naming.Identifiable; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.proto.EvaluatorRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.DriverRestartCompleted; import org.apache.reef.runtime.common.driver.DriverStatusManager; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl; import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; import org.apache.reef.runtime.common.driver.context.ContextControlHandler; import org.apache.reef.runtime.common.driver.context.ContextRepresenters; import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; import org.apache.reef.runtime.common.driver.task.TaskRepresenter; import org.apache.reef.runtime.common.utils.ExceptionCodec; import org.apache.reef.runtime.common.utils.RemoteManager; @@ -162,10 +164,10 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { return "REEF_LOCAL_RUNTIME"; } - private static boolean isDoneOrFailedOrKilled(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { - return resourceStatusProto.getState() == ReefServiceProtos.State.DONE || - resourceStatusProto.getState() == ReefServiceProtos.State.FAILED || - resourceStatusProto.getState() == ReefServiceProtos.State.KILLED; + private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) { + return resourceStatusEvent.getState() == ReefServiceProtos.State.DONE || + resourceStatusEvent.getState() == ReefServiceProtos.State.FAILED || + resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED; } @Override @@ -210,7 +212,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { @Override public void onNext(final Alarm alarm) { EvaluatorManager.this.resourceReleaseHandler.onNext( - DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() + ResourceReleaseEventImpl.newBuilder() .setIdentifier(EvaluatorManager.this.evaluatorId).build() ); } @@ -218,7 +220,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } catch (final IllegalStateException e) { LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); EvaluatorManager.this.resourceReleaseHandler.onNext( - DriverRuntimeProtocol.ResourceReleaseProto.newBuilder() + ResourceReleaseEventImpl.newBuilder() .setIdentifier(EvaluatorManager.this.evaluatorId).build() ); } @@ -404,11 +406,11 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { onEvaluatorException(evaluatorException); } - public void onResourceLaunch(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) { + public void onResourceLaunch(final ResourceLaunchEvent resourceLaunchEvent) { synchronized (this.evaluatorDescriptor) { if (this.stateManager.isAllocated()) { this.stateManager.setSubmitted(); - this.resourceLaunchHandler.onNext(resourceLaunchProto); + this.resourceLaunchHandler.onNext(resourceLaunchEvent); } else { throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED + " state but instead is in state " + this.stateManager); @@ -475,13 +477,13 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { /** * Resource status information from the (actual) resource manager. */ - public void onResourceStatusMessage(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { + public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) { synchronized (this.evaluatorDescriptor) { - LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusProto.getState()); + LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState()); if (this.stateManager.isDoneOrFailedOrKilled()) { LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.", new Object[]{this.getId(), this.stateManager}); - } else if (isDoneOrFailedOrKilled(resourceStatusProto) && this.stateManager.isAllocatedOrSubmittedOrRunning()) { + } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) { // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes // it to be alive. final StringBuilder messageBuilder = new StringBuilder("Evaluator [") @@ -489,7 +491,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { .append("] is assumed to be in state [") .append(this.stateManager.toString()) .append("]. But the resource manager reports it to be in state [") - .append(resourceStatusProto.getState()) + .append(resourceStatusEvent.getState()) .append("]."); if (this.stateManager.isSubmitted()) { @@ -507,7 +509,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } this.isResourceReleased = true; - if (resourceStatusProto.getState() == ReefServiceProtos.State.KILLED) { + if (resourceStatusEvent.getState() == ReefServiceProtos.State.KILLED) { this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString())); } else { this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString())); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java index d6e7757..e11070b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java @@ -23,8 +23,9 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.catalog.NodeDescriptor; import org.apache.reef.driver.catalog.ResourceCatalog; import org.apache.reef.driver.evaluator.EvaluatorType; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorHandler; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; import org.apache.reef.tang.Injector; import org.apache.reef.tang.exceptions.BindException; import org.apache.reef.tang.exceptions.InjectionException; @@ -80,26 +81,26 @@ public final class EvaluatorManagerFactory { /** * Instantiates a new EvaluatorManager based on a resource allocation. * - * @param resourceAllocationProto + * @param resourceAllocationEvent * @return */ - public final EvaluatorManager getNewEvaluatorManager(final DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) { - final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationProto.getNodeId()); + public final EvaluatorManager getNewEvaluatorManager(final ResourceAllocationEvent resourceAllocationEvent) { + final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId()); if (nodeDescriptor == null) { - throw new RuntimeException("Unknown resource: " + resourceAllocationProto.getNodeId()); + throw new RuntimeException("Unknown resource: " + resourceAllocationEvent.getNodeId()); } final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, - EvaluatorType.UNDECIDED, resourceAllocationProto.getResourceMemory(), resourceAllocationProto.getVirtualCores()); + EvaluatorType.UNDECIDED, resourceAllocationEvent.getResourceMemory(), resourceAllocationEvent.getVirtualCores().get()); - LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceAllocationProto.getIdentifier()); - return this.getNewEvaluatorManagerInstance(resourceAllocationProto.getIdentifier(), evaluatorDescriptor); + LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceAllocationEvent.getIdentifier()); + return this.getNewEvaluatorManagerInstance(resourceAllocationEvent.getIdentifier(), evaluatorDescriptor); } - public final EvaluatorManager createForEvaluatorFailedDuringDriverRestart(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { - if (!resourceStatusProto.getIsFromPreviousDriver()) { - throw new RuntimeException("Invalid resourceStatusProto, must be status for resource from previous Driver."); + public final EvaluatorManager createForEvaluatorFailedDuringDriverRestart(final ResourceStatusEvent resourceStatusEvent) { + if (!resourceStatusEvent.getIsFromPreviousDriver().get()) { + throw new RuntimeException("Invalid resourceStatusEvent, must be status for resource from previous Driver."); } - return getNewEvaluatorManagerInstance(resourceStatusProto.getIdentifier(), new EvaluatorDescriptorImpl(null, EvaluatorType.UNDECIDED, 128, 1)); + return getNewEvaluatorManagerInstance(resourceStatusEvent.getIdentifier(), new EvaluatorDescriptorImpl(null, EvaluatorType.UNDECIDED, 128, 1)); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java index dde4e70..664e151 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.common.driver.evaluator; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; import org.apache.reef.util.Optional; import org.apache.reef.util.SingletonAsserter; @@ -106,7 +106,7 @@ public final class Evaluators implements AutoCloseable { */ public synchronized void put( final EvaluatorManagerFactory evaluatorManagerFactory, - final DriverRuntimeProtocol.ResourceAllocationProto evaluatorMsg) { + final ResourceAllocationEvent evaluatorMsg) { this.put(evaluatorManagerFactory.getNewEvaluatorManager(evaluatorMsg)); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEvent.java new file mode 100644 index 0000000..0ddaafb --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEvent.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +/** + * Event from Driver Runtime -> Driver Process + * Description of a node in the cluster + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(NodeDescriptorEventImpl.class) +public interface NodeDescriptorEvent { + + /** + * @return Id of the node + */ + String getIdentifier(); + + /** + * @return Host name of the node + */ + String getHostName(); + + /** + * @return Port of the node + */ + int getPort(); + + /** + * @return Total memory size of the node, in MB + */ + int getMemorySize(); + + /** + * @return Name of rack where the node is located + */ + Optional<String> getRackName(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEventImpl.java new file mode 100644 index 0000000..5e93820 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorEventImpl.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +/** + * Default POJO implementation of NodeDescriptorEvent. + * Use newBuilder to construct an instance. + */ +public final class NodeDescriptorEventImpl implements NodeDescriptorEvent { + private final String identifier; + private final String hostName; + private final int port; + private final int memorySize; + private final Optional<String> rackName; + + private NodeDescriptorEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + this.hostName = BuilderUtils.notNull(builder.hostName); + this.port = BuilderUtils.notNull(builder.port); + this.memorySize = BuilderUtils.notNull(builder.memorySize); + this.rackName = Optional.ofNullable(builder.rackName); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public String getHostName() { + return hostName; + } + + @Override + public int getPort() { + return port; + } + + @Override + public int getMemorySize() { + return memorySize; + } + + @Override + public Optional<String> getRackName() { + return rackName; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create NodeDescriptorEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<NodeDescriptorEvent> { + private String identifier; + private String hostName; + private Integer port; + private Integer memorySize; + private String rackName; + + /** + * @see NodeDescriptorEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + /** + * @see NodeDescriptorEvent#getHostName() + */ + public Builder setHostName(final String hostName) { + this.hostName = hostName; + return this; + } + + /** + * @see NodeDescriptorEvent#getPort() + */ + public Builder setPort(final int port) { + this.port = port; + return this; + } + + /** + * @see NodeDescriptorEvent#getMemorySize() + */ + public Builder setMemorySize(final int memorySize) { + this.memorySize = memorySize; + return this; + } + + /** + * @see NodeDescriptorEvent#getRackName() + */ + public Builder setRackName(final String rackName) { + this.rackName = rackName; + return this; + } + + @Override + public NodeDescriptorEvent build() { + return new NodeDescriptorEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java index e5fa95e..4b02f1c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java @@ -20,7 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl; import org.apache.reef.wake.EventHandler; @@ -31,7 +30,7 @@ import javax.inject.Inject; */ @Private @DriverSide -public final class NodeDescriptorHandler implements EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> { +public final class NodeDescriptorHandler implements EventHandler<NodeDescriptorEvent> { private final ResourceCatalogImpl resourceCatalog; @Inject @@ -40,7 +39,7 @@ public final class NodeDescriptorHandler implements EventHandler<DriverRuntimePr } @Override - public void onNext(final DriverRuntimeProtocol.NodeDescriptorProto value) { + public void onNext(final NodeDescriptorEvent value) { this.resourceCatalog.handle(value); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java new file mode 100644 index 0000000..7ef7658 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEvent.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +/** + * Event from Driver Runtime -> Driver Process + * A Resource allocated by the Driver Runtime. In response to a ResourceRequestEvent. + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(ResourceAllocationEventImpl.class) +public interface ResourceAllocationEvent { + + /** + * @return Id of the allocated resource + */ + String getIdentifier(); + + /** + * @return Memory size of the resource, in MB + */ + int getResourceMemory(); + + /** + * @return Id of the node where resource was allocated + */ + String getNodeId(); + + /** + * @return Number of virtual CPU cores on the resource + */ + Optional<Integer> getVirtualCores(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java new file mode 100644 index 0000000..678d4f9 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationEventImpl.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +/** + * Default POJO implementation of ResourceAllocationEvent. + * Use newBuilder to construct an instance. + */ +public final class ResourceAllocationEventImpl implements ResourceAllocationEvent { + private final String identifier; + private final int resourceMemory; + private final String nodeId; + private final Optional<Integer> virtualCores; + + private ResourceAllocationEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + this.resourceMemory = BuilderUtils.notNull(builder.resourceMemory); + this.nodeId = BuilderUtils.notNull(builder.nodeId); + this.virtualCores = Optional.ofNullable(builder.virtualCores); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public int getResourceMemory() { + return resourceMemory; + } + + @Override + public String getNodeId() { + return nodeId; + } + + @Override + public Optional<Integer> getVirtualCores() { + return virtualCores; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create ResourceAllocationEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<ResourceAllocationEvent> { + private String identifier; + private Integer resourceMemory; + private String nodeId; + private Integer virtualCores; + + /** + * @see ResourceAllocationEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + /** + * @see ResourceAllocationEvent#getResourceMemory() + */ + public Builder setResourceMemory(final int resourceMemory) { + this.resourceMemory = resourceMemory; + return this; + } + + /** + * @see ResourceAllocationEvent#getNodeId() + */ + public Builder setNodeId(final String nodeId) { + this.nodeId = nodeId; + return this; + } + + /** + * @see ResourceAllocationEvent#getVirtualCores() + */ + public Builder setVirtualCores(final int virtualCores) { + this.virtualCores = virtualCores; + return this; + } + + @Override + public ResourceAllocationEvent build() { + return new ResourceAllocationEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java index 7d0e1e8..cc802e4 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java @@ -20,7 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory; import org.apache.reef.runtime.common.driver.evaluator.Evaluators; import org.apache.reef.wake.EventHandler; @@ -33,7 +32,7 @@ import javax.inject.Inject; @Private @DriverSide public final class ResourceAllocationHandler - implements EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> { + implements EventHandler<ResourceAllocationEvent> { /** * Helper class to make new EvaluatorManager instances, @@ -54,7 +53,7 @@ public final class ResourceAllocationHandler } @Override - public void onNext(final DriverRuntimeProtocol.ResourceAllocationProto value) { + public void onNext(final ResourceAllocationEvent value) { // FIXME: Using this put() method is a temporary fix for the race condition // described in issues #828 and #839. Use Evaluators.put(EvaluatorManager) instead // when the bug is fixed.
