http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java index c0db65d..461f1c0 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java @@ -20,7 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; import org.apache.reef.runtime.common.driver.idle.DriverIdleManager; @@ -38,7 +37,7 @@ import java.util.logging.Logger; */ @DriverSide @Private -public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>, +public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>, DriverIdlenessSource { private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName()); @@ -64,22 +63,22 @@ public final class ResourceManagerStatus implements EventHandler<DriverRuntimePr } @Override - public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { - final ReefServiceProtos.State newState = runtimeStatusProto.getState(); - LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto); - this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests(); - this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount(); - this.setState(runtimeStatusProto.getState()); + public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) { + final ReefServiceProtos.State newState = runtimeStatusEvent.getState(); + LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent); + this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().get(); + this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size(); + this.setState(runtimeStatusEvent.getState()); switch (newState) { case FAILED: - this.onRMFailure(runtimeStatusProto); + this.onRMFailure(runtimeStatusEvent); break; case DONE: - this.onRMDone(runtimeStatusProto); + this.onRMDone(runtimeStatusEvent); break; case RUNNING: - this.onRMRunning(runtimeStatusProto); + this.onRMRunning(runtimeStatusEvent); break; } } @@ -110,19 +109,19 @@ public final class ResourceManagerStatus implements EventHandler<DriverRuntimePr } - private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { - assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED); - this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError()); + private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) { + assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.FAILED); + this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get()); } - private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { - assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE); + private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) { + assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.DONE); LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown."); this.driverStatusManager.onComplete(); } - private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { - assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING); + private synchronized void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) { + assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.RUNNING); if (this.isIdle()) { this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE); }
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java new file mode 100644 index 0000000..1c17b15 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +/** + * Event from Driver Runtime -> Driver Process + * Status of a resource in the cluster + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(ResourceStatusEventImpl.class) +public interface ResourceStatusEvent { + /** + * @return Id of the resource + */ + String getIdentifier(); + + /** + * @return State of the resource + */ + ReefServiceProtos.State getState(); + + /** + * @return Diagnostics from the resource + */ + Optional<String> getDiagnostics(); + + /** + * @return Exit code of the resource, if it has exited + */ + Optional<Integer> getExitCode(); + + /** + * @return If true, this resource is from a previous Driver (the Driver was restarted) + */ + Optional<Boolean> getIsFromPreviousDriver(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java new file mode 100644 index 0000000..7258c42 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +/** + * Default POJO implementation of ResourceStatusEvent. + * Use newBuilder to construct an instance. + */ +public final class ResourceStatusEventImpl implements ResourceStatusEvent { + private final String identifier; + private final ReefServiceProtos.State state; + private final Optional<String> diagnostics; + private final Optional<Integer> exitCode; + private final Optional<Boolean> isFromPreviousDriver; + + private ResourceStatusEventImpl(final Builder builder) { + this.identifier = BuilderUtils.notNull(builder.identifier); + this.state = BuilderUtils.notNull(builder.state); + this.diagnostics = Optional.ofNullable(builder.diagnostics); + this.exitCode = Optional.ofNullable(builder.exitCode); + this.isFromPreviousDriver = Optional.ofNullable(builder.isFromPreviousDriver); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public ReefServiceProtos.State getState() { + return state; + } + + @Override + public Optional<String> getDiagnostics() { + return diagnostics; + } + + @Override + public Optional<Integer> getExitCode() { + return exitCode; + } + + @Override + public Optional<Boolean> getIsFromPreviousDriver() { + return isFromPreviousDriver; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create ResourceStatusEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<ResourceStatusEvent> { + private String identifier; + private ReefServiceProtos.State state; + private String diagnostics; + private Integer exitCode; + private Boolean isFromPreviousDriver; + + /** + * @see ResourceStatusEvent#getIdentifier() + */ + public Builder setIdentifier(final String identifier) { + this.identifier = identifier; + return this; + } + + /** + * @see ResourceStatusEvent#getState() + */ + public Builder setState(final ReefServiceProtos.State state) { + this.state = state; + return this; + } + + /** + * @see ResourceStatusEvent#getDiagnostics() + */ + public Builder setDiagnostics(final String diagnostics) { + this.diagnostics = diagnostics; + return this; + } + + /** + * @see ResourceStatusEvent#getExitCode() + */ + public Builder setExitCode(final int exitCode) { + this.exitCode = exitCode; + return this; + } + + /** + * @see ResourceStatusEvent#getIsFromPreviousDriver() + */ + public Builder setIsFromPreviousDriver(final boolean isFromPreviousDriver) { + this.isFromPreviousDriver = isFromPreviousDriver; + return this; + } + + @Override + public ResourceStatusEvent build() { + return new ResourceStatusEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java index 9abafbd..6d31824 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java @@ -19,7 +19,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory; import org.apache.reef.runtime.common.driver.evaluator.Evaluators; @@ -33,7 +32,7 @@ import javax.inject.Inject; * about the current state of a given resource. Ideally, we should think the same thing. */ @Private -public final class ResourceStatusHandler implements EventHandler<DriverRuntimeProtocol.ResourceStatusProto> { +public final class ResourceStatusHandler implements EventHandler<ResourceStatusEvent> { private final Evaluators evaluators; private final EvaluatorManagerFactory evaluatorManagerFactory; @@ -49,21 +48,21 @@ public final class ResourceStatusHandler implements EventHandler<DriverRuntimePr * about the state of the resource executing an Evaluator; This method simply passes the message * off to the referenced EvaluatorManager * - * @param resourceStatusProto resource status message from the ResourceManager + * @param resourceStatusEvent resource status message from the ResourceManager */ @Override - public void onNext(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { - final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusProto.getIdentifier()); + public void onNext(final ResourceStatusEvent resourceStatusEvent) { + final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusEvent.getIdentifier()); if (evaluatorManager.isPresent()) { - evaluatorManager.get().onResourceStatusMessage(resourceStatusProto); + evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent); } else { - if (resourceStatusProto.getIsFromPreviousDriver()) { - EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusProto); - previousEvaluatorManager.onResourceStatusMessage(resourceStatusProto); + if (resourceStatusEvent.getIsFromPreviousDriver().get()) { + EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusEvent); + previousEvaluatorManager.onResourceStatusMessage(resourceStatusEvent); } else { throw new RuntimeException( - "Unknown resource status from evaluator " + resourceStatusProto.getIdentifier() + - " with state " + resourceStatusProto.getState() + "Unknown resource status from evaluator " + resourceStatusEvent.getIdentifier() + + " with state " + resourceStatusEvent.getState() ); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java new file mode 100644 index 0000000..bca7354 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.util.Optional; + +import java.util.List; + +/** + * Event from Driver Runtime -> Driver Process + * A status update from the Driver Runtime to the Driver Process + */ +@RuntimeAuthor +@DriverSide +@DefaultImplementation(RuntimeStatusEventImpl.class) +public interface RuntimeStatusEvent { + /** + * @return Name of the Runtime + */ + String getName(); + + /** + * @return State of the Runtime + */ + ReefServiceProtos.State getState(); + + /** + * @return List of allocated containers + */ + List<String> getContainerAllocationList(); + + /** + * @return Error from the Runtime + */ + Optional<ReefServiceProtos.RuntimeErrorProto> getError(); + + /** + * @return Number of outstanding container requests + */ + Optional<Integer> getOutstandingContainerRequests(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java new file mode 100644 index 0000000..5d8060d --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.driver.resourcemanager; + +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.util.BuilderUtils; +import org.apache.reef.util.Optional; + +import java.util.ArrayList; +import java.util.List; + +/** + * Default POJO implementation of RuntimeStatusEvent. + * Use newBuilder to construct an instance. + */ +public final class RuntimeStatusEventImpl implements RuntimeStatusEvent { + private final String name; + private final ReefServiceProtos.State state; + private final List<String> containerAllocationList; + private final Optional<ReefServiceProtos.RuntimeErrorProto> error; + private final Optional<Integer> outstandingContainerRequests; + + private RuntimeStatusEventImpl(final Builder builder) { + this.name = BuilderUtils.notNull(builder.name); + this.state = BuilderUtils.notNull(builder.state); + this.containerAllocationList = BuilderUtils.notNull(builder.containerAllocationList); + this.error = Optional.ofNullable(builder.error); + this.outstandingContainerRequests = Optional.ofNullable(builder.outstandingContainerRequests); + } + + @Override + public String getName() { + return name; + } + + @Override + public ReefServiceProtos.State getState() { + return state; + } + + @Override + public List<String> getContainerAllocationList() { + return containerAllocationList; + } + + @Override + public Optional<ReefServiceProtos.RuntimeErrorProto> getError() { + return error; + } + + @Override + public Optional<Integer> getOutstandingContainerRequests() { + return outstandingContainerRequests; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create RuntimeStatusEvent instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<RuntimeStatusEvent> { + private String name; + private ReefServiceProtos.State state; + private List<String> containerAllocationList = new ArrayList<>(); + private ReefServiceProtos.RuntimeErrorProto error; + private Integer outstandingContainerRequests; + + /** + * @see RuntimeStatusEvent#getName() + */ + public Builder setName(final String name) { + this.name = name; + return this; + } + + /** + * @see RuntimeStatusEvent#getState() + */ + public Builder setState(final ReefServiceProtos.State state) { + this.state = state; + return this; + } + + /** + * Add an entry to containerAllocationList + * @see RuntimeStatusEvent#getContainerAllocationList() + */ + public Builder addContainerAllocation(final String containerAllocation) { + this.containerAllocationList.add(containerAllocation); + return this; + } + + /** + * @see RuntimeStatusEvent#getError() + */ + public Builder setError(final ReefServiceProtos.RuntimeErrorProto error) { + this.error = error; + return this; + } + + /** + * @see RuntimeStatusEvent#getOutstandingContainerRequests() + */ + public Builder setOutstandingContainerRequests(final int outstandingContainerRequests) { + this.outstandingContainerRequests = outstandingContainerRequests; + return this; + } + + @Override + public RuntimeStatusEvent build() { + return new RuntimeStatusEventImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java new file mode 100644 index 0000000..e088afd --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.files; + +import org.apache.reef.annotations.audience.RuntimeAuthor; +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * A File Resource with a FileType for use by Runtimes + */ +@RuntimeAuthor +@DefaultImplementation(FileResourceImpl.class) +public interface FileResource { + + /** + * @return Type of the file + */ + FileType getType(); + + /** + * @return Name of the file + */ + String getName(); + + /** + * @return Path of the file + */ + String getPath(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java new file mode 100644 index 0000000..2f617e8 --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.files; + +import org.apache.reef.util.BuilderUtils; + +/** + * Default POJO implementation of FileResource. + * Use newBuilder to construct an instance. + */ +public final class FileResourceImpl implements FileResource { + private final FileType type; + private final String name; + private final String path; + + private FileResourceImpl(final Builder builder) { + this.type = BuilderUtils.notNull(builder.type); + this.name = BuilderUtils.notNull(builder.name); + this.path = BuilderUtils.notNull(builder.path); + } + + @Override + public FileType getType() { + return type; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getPath() { + return path; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder used to create FileResource instances. + */ + public static final class Builder implements org.apache.reef.util.Builder<FileResource> { + private FileType type; + private String name; + private String path; + + /** + * @see FileResource#getType() + */ + public Builder setType(final FileType type) { + this.type = type; + return this; + } + + /** + * @see FileResource#getName() + */ + public Builder setName(final String name) { + this.name = name; + return this; + } + + /** + * @see FileResource#getPath() + */ + public Builder setPath(final String path) { + this.path = path; + return this; + } + + @Override + public FileResource build() { + return new FileResourceImpl(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java new file mode 100644 index 0000000..1b729ad --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.files; + +import org.apache.reef.annotations.audience.RuntimeAuthor; + +/** + * Type of a File Resource used by Runtimes + */ +@RuntimeAuthor +public enum FileType { + PLAIN, + LIB, + ARCHIVE +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java index 5297158..ecf3aa8 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java @@ -21,8 +21,7 @@ package org.apache.reef.runtime.common.files; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.annotations.audience.RuntimeAuthor; -import org.apache.reef.proto.ClientRuntimeProtocol; -import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; import org.apache.reef.runtime.common.parameters.DeleteTempFiles; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.annotations.Parameter; @@ -59,13 +58,13 @@ public final class JobJarMaker { this.deleteTempFilesOnExit = deleteTempFilesOnExit; } - public static void copy(final Iterable<ReefServiceProtos.FileResourceProto> files, final File destinationFolder) { + public static void copy(final Iterable<FileResource> files, final File destinationFolder) { if (!destinationFolder.exists()) { destinationFolder.mkdirs(); } - for (final ReefServiceProtos.FileResourceProto fileProto : files) { + for (final FileResource fileProto : files) { final File sourceFile = toFile(fileProto); final File destinationFile = new File(destinationFolder, fileProto.getName()); if (destinationFile.exists()) { @@ -89,12 +88,12 @@ public final class JobJarMaker { } } - private static File toFile(final ReefServiceProtos.FileResourceProto fileProto) { + private static File toFile(final FileResource fileProto) { return new File(fileProto.getPath()); } public File createJobSubmissionJAR( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, + final JobSubmissionEvent jobSubmissionEvent, final Configuration driverConfiguration) throws IOException { // Copy all files to a local job submission folder @@ -104,8 +103,8 @@ public final class JobJarMaker { final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName()); final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName()); - this.copy(jobSubmissionProto.getGlobalFileList(), globalFolder); - this.copy(jobSubmissionProto.getLocalFileList(), localFolder); + this.copy(jobSubmissionEvent.getGlobalFileSet(), globalFolder); + this.copy(jobSubmissionEvent.getLocalFileSet(), localFolder); // Store the Driver Configuration in the JAR file. this.configurationSerializer.toFile( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java new file mode 100644 index 0000000..286174e --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.launch; + +/** + * The type of a process to be launched by the Runtime + */ +public enum ProcessType { + JVM, + CLR +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java index 8a81184..39a8f8d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java @@ -52,9 +52,7 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> { public GeneratedMessage decode(final byte[] bytes) { try { final REEFProtocol.REEFMessage message = REEFProtocol.REEFMessage.parseFrom(bytes); - if (message.hasJobSubmission()) { - return message.getJobSubmission(); - } else if (message.hasJobControl()) { + if (message.hasJobControl()) { return message.getJobControl(); } else if (message.hasRuntimeError()) { return message.getRuntimeError(); @@ -75,9 +73,7 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> { public byte[] encode(final GeneratedMessage msg) { final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder(); - if (msg instanceof ClientRuntimeProtocol.JobSubmissionProto) { - message.setJobSubmission((ClientRuntimeProtocol.JobSubmissionProto) msg); - } else if (msg instanceof ClientRuntimeProtocol.JobControlProto) { + if (msg instanceof ClientRuntimeProtocol.JobControlProto) { message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg); } else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) { message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java new file mode 100644 index 0000000..68cd7ce --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.util; + +/** + * Utilities for creating Builders + */ +public final class BuilderUtils { + /** + * Throws a runtime exception if the parameter is null + */ + public static <T> T notNull(final T parameter) { + if (parameter == null) { + throw new IllegalArgumentException("required parameter"); + } else { + return parameter; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/client_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/proto/client_runtime.proto b/lang/java/reef-common/src/main/proto/client_runtime.proto index 36c648b..2846619 100644 --- a/lang/java/reef-common/src/main/proto/client_runtime.proto +++ b/lang/java/reef-common/src/main/proto/client_runtime.proto @@ -23,22 +23,6 @@ import "reef_service_protos.proto"; // Messages from REEF Client -> Driver Runtime -message JobSubmissionProto { - required string identifier = 1; // the job identifier - required string remote_id = 2; // the remote identifier - required string configuration = 5; // the runtime configuration - required string user_name = 6; // the user name - - //optional SIZE driver_size = 7; // Removed in REEF 0.3 in favor of driver_memory below. - optional int32 driver_memory = 8; - optional int32 priority = 9; - optional string queue = 10; - - repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators - repeated FileResourceProto local_File = 12; // files that should be placed on the driver only - -} - enum Signal { SIG_TERMINATE = 1; SIG_SUSPEND = 2; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/driver_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/proto/driver_runtime.proto b/lang/java/reef-common/src/main/proto/driver_runtime.proto deleted file mode 100644 index 64f5dc1..0000000 --- a/lang/java/reef-common/src/main/proto/driver_runtime.proto +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -option java_package = "org.apache.reef.proto"; -option java_outer_classname = "DriverRuntimeProtocol"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - - -import "reef_service_protos.proto"; - -// Messages from Driver Runtime -> Driver Process - -message DriverProcessRegistrationProto { - required string remote_identifier = 1; -} - - -message NodeDescriptorProto { - required string identifier = 1; - required string host_name = 2; // e.g., IP address - required int32 port = 3; // e.g., IP port - required int32 memory_size = 4; - optional string rack_name = 5; // e.g., /default-rack -} - -message ResourceAllocationProto { - required string identifier = 1; // e.g., the container id, or the thread id - required int32 resource_memory = 2; // megabytes - required string node_id = 3; - optional int32 virtual_cores = 4; -} - -message ResourceStatusProto { - required string identifier = 1; - required State state = 2; - optional string diagnostics = 3; - optional int32 exit_code = 4; - optional bool is_from_previous_driver = 5; -} - -message RuntimeStatusProto { - required string name = 1; // e.g., local, yarn21 - required State state = 2; - optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error - - optional int32 outstanding_container_requests = 5; - repeated string container_allocation = 6; -} - -////////////////////////////////////////////////////// -// Messages from Driver Process -> Driver Runtime - -message ResourceRequestProto { - // optional SIZE resource_size = 1; // Removed in REEF 0.3 in favor of memory_size. - optional int32 memory_size = 2; // Memory size of the evaluator in MB - optional int32 priority = 3; - optional int32 virtual_cores = 4; - required int32 resource_count = 5; - repeated string node_name = 6; // a list of specific nodes - repeated string rack_name = 7; // a list of specific racks - - optional bool relax_locality = 10; -} - -message ResourceReleaseProto { - required string identifier = 1; -} - -message ResourceLaunchProto { - required string identifier = 1; - required string remote_id = 2; - required string evaluator_conf = 3; - required ProcessType type = 4; - repeated FileResourceProto file = 10; -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/reef_protocol.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/proto/reef_protocol.proto b/lang/java/reef-common/src/main/proto/reef_protocol.proto index a8c793f..09c4476 100644 --- a/lang/java/reef-common/src/main/proto/reef_protocol.proto +++ b/lang/java/reef-common/src/main/proto/reef_protocol.proto @@ -30,8 +30,9 @@ option java_generate_equals_and_hash = true; option java_outer_classname = "REEFProtocol"; message REEFMessage { + // Field 1 removed + // Messages defined in client_runtime.proto - optional JobSubmissionProto jobSubmission = 1; optional JobControlProto jobControl = 2; // Messages defined in reef_service_protos.proto optional RuntimeErrorProto runtimeError = 3; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/reef_service_protos.proto ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/proto/reef_service_protos.proto b/lang/java/reef-common/src/main/proto/reef_service_protos.proto index 7494737..38d2b39 100644 --- a/lang/java/reef-common/src/main/proto/reef_service_protos.proto +++ b/lang/java/reef-common/src/main/proto/reef_service_protos.proto @@ -32,31 +32,6 @@ enum State { KILLED = 5; } -enum FileType { - PLAIN = 0; - LIB = 1; - ARCHIVE = 2; -} - -// Removed in REEF 0.3 in favor of explicit memory sizes. -// enum SIZE { -// SMALL = 0; -// MEDIUM = 1; -// LARGE = 2; -// XLARGE = 3; -//} - -enum ProcessType { - JVM = 0; - CLR = 1; -} - -message FileResourceProto { - required FileType type = 1; - required string name = 2; - required string path = 3; -} - message RuntimeErrorProto { required string name = 1; // e.g., local, yarn21 required string message = 2; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java index ddbd83b..dce5f50 100644 --- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java +++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java @@ -21,7 +21,7 @@ package org.apache.reef.runtime.common.driver; import org.apache.reef.driver.catalog.ResourceCatalog; import org.apache.reef.driver.evaluator.EvaluatorRequest; import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; @@ -53,8 +53,8 @@ public class EvaluatorRequestorImplTest { final DummyRequestHandler requestHandler = new DummyRequestHandler(); final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory); evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).build()); - Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory); - Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), 1); + Assert.assertEquals("Memory request did not make it", memory, requestHandler.get().getMemorySize().get().intValue()); + Assert.assertEquals("Number of requests did not make it", 1, requestHandler.get().getResourceCount()); } /** @@ -67,8 +67,8 @@ public class EvaluatorRequestorImplTest { final DummyRequestHandler requestHandler = new DummyRequestHandler(); final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory); evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).setNumber(count).build()); - Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory); - Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), count); + Assert.assertEquals("Memory request did not make it", memory, requestHandler.get().getMemorySize().get().intValue()); + Assert.assertEquals("Number of requests did not make it", count, requestHandler.get().getResourceCount()); } /** @@ -96,14 +96,14 @@ public class EvaluatorRequestorImplTest { } private class DummyRequestHandler implements ResourceRequestHandler { - private DriverRuntimeProtocol.ResourceRequestProto request; + private ResourceRequestEvent request; @Override - public void onNext(DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { - this.request = resourceRequestProto; + public void onNext(final ResourceRequestEvent resourceRequestEvent) { + this.request = resourceRequestEvent; } - public DriverRuntimeProtocol.ResourceRequestProto get() { + public ResourceRequestEvent get() { return this.request; } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java index 66203b2..72503d0 100644 --- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java +++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java @@ -18,7 +18,7 @@ */ package org.apache.reef.runtime.common.driver.catalog; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; import org.junit.Assert; import org.junit.Test; @@ -35,7 +35,7 @@ public final class CatalogTest { final ResourceCatalogImpl catalog = new ResourceCatalogImpl(); for (int i = 0; i < nodes; i++) { - catalog.handle(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder() + catalog.handle(NodeDescriptorEventImpl.newBuilder() .setRackName("test-rack") .setHostName("test-" + i) .setPort(0) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java index 7ac314a..dc56abf 100644 --- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java +++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java @@ -22,7 +22,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.ClientRuntimeProtocol; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.JobJarMaker; @@ -33,7 +33,6 @@ import org.apache.reef.runtime.hdinsight.client.yarnrest.*; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; import org.apache.reef.tang.annotations.Parameter; -import org.apache.reef.tang.formats.ConfigurationSerializer; import javax.inject.Inject; import java.io.File; @@ -54,7 +53,6 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler private final AzureUploader uploader; private final JobJarMaker jobJarMaker; private final HDInsightInstance hdInsightInstance; - private final ConfigurationSerializer configurationSerializer; private final REEFFileNames filenames; private final ClasspathProvider classpath; private final double jvmHeapSlack; @@ -63,14 +61,12 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler HDInsightJobSubmissionHandler(final AzureUploader uploader, final JobJarMaker jobJarMaker, final HDInsightInstance hdInsightInstance, - final ConfigurationSerializer configurationSerializer, final REEFFileNames filenames, final ClasspathProvider classpath, final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) { this.uploader = uploader; this.jobJarMaker = jobJarMaker; this.hdInsightInstance = hdInsightInstance; - this.configurationSerializer = configurationSerializer; this.filenames = filenames; this.classpath = classpath; this.jvmHeapSlack = jvmHeapSlack; @@ -82,7 +78,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler } @Override - public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { + public void onNext(final JobSubmissionEvent jobSubmissionEvent) { try { @@ -96,25 +92,25 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler LOG.log(Level.FINE, "Assembling Configuration for the Driver."); final Configuration driverConfiguration = - makeDriverConfiguration(jobSubmissionProto, applicationID.getId(), jobFolderURL); + makeDriverConfiguration(jobSubmissionEvent, applicationID.getId(), jobFolderURL); LOG.log(Level.FINE, "Making Job JAR."); final File jobSubmissionJarFile = - this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration); + this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); LOG.log(Level.FINE, "Uploading Job JAR to Azure."); final FileResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile); LOG.log(Level.FINE, "Assembling application submission."); - final String command = getCommandString(jobSubmissionProto); + final String command = getCommandString(jobSubmissionEvent); final ApplicationSubmission applicationSubmission = new ApplicationSubmission() .setApplicationId(applicationID.getId()) - .setApplicationName(jobSubmissionProto.getIdentifier()) - .setResource(getResource(jobSubmissionProto)) + .setApplicationName(jobSubmissionEvent.getIdentifier()) + .setResource(getResource(jobSubmissionEvent)) .setContainerInfo(new ContainerInfo() - .addFileResource(this.filenames.getREEFFolderName(), uploadedFile) - .addCommand(command)); + .addFileResource(this.filenames.getREEFFolderName(), uploadedFile) + .addCommand(command)); this.hdInsightInstance.submitApplication(applicationSubmission); LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getId()); @@ -126,13 +122,13 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler } /** - * Extracts the resource demands from the jobSubmissionProto. + * Extracts the resource demands from the jobSubmissionEvent. */ private final Resource getResource( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { + final JobSubmissionEvent jobSubmissionEvent) { return new Resource() - .setMemory(String.valueOf(jobSubmissionProto.getDriverMemory())) + .setMemory(String.valueOf(jobSubmissionEvent.getDriverMemory().get())) .setvCores("1"); } @@ -140,30 +136,30 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler * Assembles the command to execute the Driver. */ private String getCommandString( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { - return StringUtils.join(getCommandList(jobSubmissionProto), ' '); + final JobSubmissionEvent jobSubmissionEvent) { + return StringUtils.join(getCommandList(jobSubmissionEvent), ' '); } /** * Assembles the command to execute the Driver in list form. */ private List<String> getCommandList( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { + final JobSubmissionEvent jobSubmissionEvent) { return new JavaLaunchCommandBuilder() .setJavaPath("%JAVA_HOME%/bin/java") - .setErrorHandlerRID(jobSubmissionProto.getRemoteId()) - .setLaunchID(jobSubmissionProto.getIdentifier()) + .setErrorHandlerRID(jobSubmissionEvent.getRemoteId()) + .setLaunchID(jobSubmissionEvent.getIdentifier()) .setConfigurationFileName(this.filenames.getDriverConfigurationPath()) .setClassPath(this.classpath.getDriverClasspath()) - .setMemory(jobSubmissionProto.getDriverMemory()) + .setMemory(jobSubmissionEvent.getDriverMemory().get()) .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName()) .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName()) .build(); } private Configuration makeDriverConfiguration( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, + final JobSubmissionEvent jobSubmissionEvent, final String applicationId, final String jobFolderURL) throws IOException { @@ -174,7 +170,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler .build(); return Configurations.merge( - this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()), + jobSubmissionEvent.getConfiguration(), hdinsightDriverConfiguration); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java index c54f9e0..42ced95 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java @@ -18,8 +18,9 @@ */ package org.apache.reef.runtime.local.client; -import org.apache.reef.proto.ClientRuntimeProtocol; -import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; +import org.apache.reef.runtime.common.files.FileResource; +import org.apache.reef.runtime.common.files.FileType; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.tang.formats.ConfigurationModule; import org.apache.reef.tang.formats.OptionalParameter; @@ -51,28 +52,28 @@ final class DriverFiles { /** * Instantiates an instance based on the given JobSubmissionProto. * - * @param jobSubmissionProto the JobSubmissionProto to parse. + * @param jobSubmissionEvent the JobSubmissionProto to parse. * @return a DriverFiles instance pre-populated with the information from the given JobSubmissionProto. * @throws IOException */ public static DriverFiles fromJobSubmission( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, + final JobSubmissionEvent jobSubmissionEvent, final REEFFileNames fileNames) throws IOException { final DriverFiles driverFiles = new DriverFiles(fileNames); - for (final ReefServiceProtos.FileResourceProto frp : jobSubmissionProto.getGlobalFileList()) { + for (final FileResource frp : jobSubmissionEvent.getGlobalFileSet()) { final File f = new File(frp.getPath()); - if (frp.getType() == ReefServiceProtos.FileType.LIB) { + if (frp.getType() == FileType.LIB) { driverFiles.addGlobalLib(f); } else { driverFiles.addGlobalFile(f); } } - for (final ReefServiceProtos.FileResourceProto frp : jobSubmissionProto.getLocalFileList()) { + for (final FileResource frp : jobSubmissionEvent.getLocalFileSet()) { final File f = new File(frp.getPath()); - if (frp.getType() == ReefServiceProtos.FileType.LIB) { + if (frp.getType() == FileType.LIB) { driverFiles.addLocalLib(f); } else { driverFiles.addLocalFile(f); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java index d417fce..f4befc1 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java @@ -20,14 +20,11 @@ package org.apache.reef.runtime.local.client; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.ClientRuntimeProtocol; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; -import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; import org.apache.reef.runtime.local.client.parameters.RootFolder; -import org.apache.reef.runtime.local.process.LoggingRunnableProcessObserver; -import org.apache.reef.runtime.local.process.RunnableProcess; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.formats.ConfigurationSerializer; @@ -53,7 +50,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler { private final String rootFolderName; private final ConfigurationSerializer configurationSerializer; private final REEFFileNames fileNames; - private final ClasspathProvider classpath; private final PreparedDriverFolderLauncher driverLauncher; private final LoggingScopeFactory loggingScopeFactory; private final DriverConfigurationProvider driverConfigurationProvider; @@ -64,7 +60,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler { final @Parameter(RootFolder.class) String rootFolderName, final ConfigurationSerializer configurationSerializer, final REEFFileNames fileNames, - final ClasspathProvider classpath, final PreparedDriverFolderLauncher driverLauncher, final LoggingScopeFactory loggingScopeFactory, @@ -73,7 +68,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler { this.executor = executor; this.configurationSerializer = configurationSerializer; this.fileNames = fileNames; - this.classpath = classpath; this.driverLauncher = driverLauncher; this.driverConfigurationProvider = driverConfigurationProvider; @@ -89,7 +83,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler { } @Override - public final void onNext(final ClientRuntimeProtocol.JobSubmissionProto t) { + public final void onNext(final JobSubmissionEvent t) { try (final LoggingScope lf = loggingScopeFactory.localJobSubmission()) { try { LOG.log(Level.FINEST, "Starting local job {0}", t.getIdentifier()); @@ -104,8 +98,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler { driverFiles.copyTo(driverFolder); final Configuration driverConfiguration = this.driverConfigurationProvider - .getDriverConfiguration(jobFolder, t.getRemoteId(), t.getIdentifier(), - configurationSerializer.fromString(t.getConfiguration())); + .getDriverConfiguration(jobFolder, t.getRemoteId(), t.getIdentifier(), t.getConfiguration()); this.configurationSerializer.toFile(driverConfiguration, new File(driverFolder, this.fileNames.getDriverConfigurationPath())); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java index 65290c1..9127da9 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java @@ -21,9 +21,10 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.client.FailedRuntime; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.utils.RemoteManager; import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; @@ -68,7 +69,7 @@ final class ContainerManager implements AutoCloseable { private final String errorHandlerRID; private final int capacity; - private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorHandler; + private final EventHandler<NodeDescriptorEvent> nodeDescriptorHandler; private final File rootFolder; private final REEFFileNames fileNames; private final ReefRunnableProcessObserver processObserver; @@ -82,10 +83,9 @@ final class ContainerManager implements AutoCloseable { final @Parameter(MaxNumberOfEvaluators.class) int capacity, final @Parameter(RootFolder.class) String rootFolderName, final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) - EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorHandler, + EventHandler<NodeDescriptorEvent> nodeDescriptorHandler, final ReefRunnableProcessObserver processObserver, final LocalAddressProvider localAddressProvider) { - this.capacity = capacity; this.fileNames = fileNames; this.processObserver = processObserver; @@ -131,7 +131,7 @@ final class ContainerManager implements AutoCloseable { for (int i = 0; i < capacity; i++) { final String id = idmaker.getNextID(); this.freeNodeList.add(id); - nodeDescriptorHandler.onNext(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder() + nodeDescriptorHandler.onNext(NodeDescriptorEventImpl.newBuilder() .setIdentifier(id) .setRackName("/default-rack") .setHostName(this.localAddress) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java index be3ead8..bf4a177 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; import javax.inject.Inject; @@ -40,7 +40,7 @@ final class LocalResourceLaunchHandler implements ResourceLaunchHandler { } @Override - public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto t) { + public void onNext(final ResourceLaunchEvent t) { this.resourceManager.onResourceLaunchRequest(t); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java index 04c2730..514a297 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; import javax.inject.Inject; @@ -41,7 +41,7 @@ public final class LocalResourceReleaseHandler implements ResourceReleaseHandler } @Override - public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto t) { + public void onNext(final ResourceReleaseEvent t) { this.resourceManager.onResourceReleaseRequest(t); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java index a08b05c..99ae748 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; import javax.inject.Inject; @@ -40,7 +40,7 @@ final class LocalResourceRequestHandler implements ResourceRequestHandler { } @Override - public void onNext(final DriverRuntimeProtocol.ResourceRequestProto t) { + public void onNext(final ResourceRequestEvent t) { this.resourceManager.onResourceRequest(t); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java index e30eba3..7d190cb 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java @@ -20,10 +20,17 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.common.files.ClasspathProvider; +import org.apache.reef.runtime.common.files.FileResource; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder; import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; @@ -58,9 +65,9 @@ public final class ResourceManager { private final ResourceRequestQueue requestQueue = new ResourceRequestQueue(); - private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler; + private final EventHandler<ResourceAllocationEvent> allocationHandler; private final ContainerManager theContainers; - private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler; + private final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler; private final int defaultMemorySize; private final int defaultNumberOfCores; private final ConfigurationSerializer configurationSerializer; @@ -73,8 +80,8 @@ public final class ResourceManager { @Inject ResourceManager( final ContainerManager containerManager, - final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler, - final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler, + final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationEvent> allocationHandler, + final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler, final @Parameter(DefaultMemorySize.class) int defaultMemorySize, final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores, final @Parameter(JVMHeapSlack.class) double jvmHeapSlack, @@ -105,9 +112,9 @@ public final class ResourceManager { * @param launchRequest the ResourceLaunchProto to parse * @return a list of files set in the given ResourceLaunchProto */ - private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) { + private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) { final List<File> files = new ArrayList<>(); // Libraries local to this evaluator - for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) { + for (final FileResource frp : launchRequest.getFileSet()) { files.add(new File(frp.getPath()).getAbsoluteFile()); } return files; @@ -120,7 +127,7 @@ public final class ResourceManager { * * @param resourceRequest the resource request to be handled. */ - final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) { + final void onResourceRequest(final ResourceRequestEvent resourceRequest) { synchronized (this.theContainers) { this.requestQueue.add(new ResourceRequest(resourceRequest)); this.checkRequestQueue(); @@ -132,7 +139,7 @@ public final class ResourceManager { * * @param releaseRequest the release request to be processed */ - final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) { + final void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) { synchronized (this.theContainers) { LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier()); this.theContainers.release(releaseRequest.getIdentifier()); @@ -158,7 +165,7 @@ public final class ResourceManager { * @param launchRequest the launch request to be processed. */ final void onResourceLaunchRequest( - final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) { + final ResourceLaunchEvent launchRequest) { synchronized (this.theContainers) { @@ -173,8 +180,7 @@ public final class ResourceManager { final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath()); try { - this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()), - evaluatorConfigurationFile); + this.configurationSerializer.toFile(launchRequest.getEvaluatorConf(), evaluatorConfigurationFile); } catch (final IOException | BindException e) { throw new RuntimeException("Unable to write configuration.", e); } @@ -218,16 +224,16 @@ public final class ResourceManager { if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) { // Record the satisfaction of one request and get its details. - final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne(); + final ResourceRequestEvent requestEvent = this.requestQueue.satisfyOne(); // Allocate a Container final Container container = this.theContainers.allocateOne( - requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize, - requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores); + requestEvent.getMemorySize().orElse(this.defaultMemorySize), + requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores)); // Tell the receivers about it - final DriverRuntimeProtocol.ResourceAllocationProto alloc = - DriverRuntimeProtocol.ResourceAllocationProto.newBuilder() + final ResourceAllocationEvent alloc = + ResourceAllocationEventImpl.newBuilder() .setIdentifier(container.getContainerID()) .setNodeId(container.getNodeID()) .setResourceMemory(container.getMemory()) @@ -250,16 +256,18 @@ public final class ResourceManager { private void sendRuntimeStatus() { - final DriverRuntimeProtocol.RuntimeStatusProto msg = - DriverRuntimeProtocol.RuntimeStatusProto.newBuilder() + final RuntimeStatusEventImpl.Builder builder = + RuntimeStatusEventImpl.newBuilder() .setName("LOCAL") .setState(ReefServiceProtos.State.RUNNING) - .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests()) - .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs()) - .build(); + .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests()); + for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) { + builder.addContainerAllocation(containerAllocation); + } + final RuntimeStatusEvent msg = builder.build(); LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}", - new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()}); + new Object[]{msg.getContainerAllocationList().size(), msg.getOutstandingContainerRequests()}); this.runtimeStatusHandlerEventHandler.onNext(msg); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java index bbb20a3..4b275cb 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java @@ -20,21 +20,21 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; /** - * Manages a ResourceRequestProto and its satisfaction. + * Manages a ResourceRequestEvent and its satisfaction. */ @Private @DriverSide final class ResourceRequest { - private final DriverRuntimeProtocol.ResourceRequestProto req; + private final ResourceRequestEvent req; private int satisfied = 0; - ResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto req) { + ResourceRequest(final ResourceRequestEvent req) { if (null == req) { - throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestProto"); + throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestEvent"); } this.req = req; } @@ -57,7 +57,7 @@ final class ResourceRequest { return this.satisfied == req.getResourceCount(); } - final DriverRuntimeProtocol.ResourceRequestProto getRequestProto() { + final ResourceRequestEvent getRequestProto() { return this.req; } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java index c2a87ff..dcebe20 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -54,7 +54,7 @@ final class ResourceRequestQueue { * Satisfies one resource for the front-most request. If that satisfies the * request, it is removed from the queue. */ - final synchronized DriverRuntimeProtocol.ResourceRequestProto satisfyOne() { + final synchronized ResourceRequestEvent satisfyOne() { final ResourceRequest req = this.requestQueue.element(); req.satisfyOne(); if (req.isSatisfied()) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java index 36ee916..3d5051a 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java @@ -19,9 +19,10 @@ package org.apache.reef.runtime.local.process; import net.jcip.annotations.ThreadSafe; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; import org.apache.reef.runtime.local.driver.ResourceManager; import org.apache.reef.tang.InjectionFuture; import org.apache.reef.tang.annotations.Parameter; @@ -38,7 +39,7 @@ import java.util.logging.Logger; public final class ReefRunnableProcessObserver implements RunnableProcessObserver { private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName()); - private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler; + private final EventHandler<ResourceStatusEvent> resourceStatusHandler; private final InjectionFuture<ResourceManager> resourceManager; /** @@ -46,7 +47,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve */ @Inject public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class) - EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler, + EventHandler<ResourceStatusEvent> resourceStatusHandler, final InjectionFuture<ResourceManager> resourceManager) { this.resourceStatusHandler = resourceStatusHandler; this.resourceManager = resourceManager; @@ -55,7 +56,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve @Override public void onProcessStarted(final String processId) { this.onResourceStatus( - DriverRuntimeProtocol.ResourceStatusProto.newBuilder() + ResourceStatusEventImpl.newBuilder() .setIdentifier(processId) .setState(ReefServiceProtos.State.RUNNING) .build() @@ -84,7 +85,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve */ private void onCleanExit(final String processId) { this.onResourceStatus( - DriverRuntimeProtocol.ResourceStatusProto.newBuilder() + ResourceStatusEventImpl.newBuilder() .setIdentifier(processId) .setState(ReefServiceProtos.State.DONE) .setExitCode(0) @@ -100,7 +101,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve */ private void onUncleanExit(final String processId, final int exitCode) { this.onResourceStatus( - DriverRuntimeProtocol.ResourceStatusProto.newBuilder() + ResourceStatusEventImpl.newBuilder() .setIdentifier(processId) .setState(ReefServiceProtos.State.FAILED) .setExitCode(exitCode) @@ -108,7 +109,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve ); } - private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) { + private void onResourceStatus(final ResourceStatusEvent resourceStatus) { LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus); // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last
