[
https://issues.apache.org/jira/browse/BEAM-4648?focusedWorklogId=122164&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-122164
]
ASF GitHub Bot logged work on BEAM-4648:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jul/18 05:07
Start Date: 12/Jul/18 05:07
Worklog Time Spent: 10m
Work Description: charlesccychen closed pull request #5777: [BEAM-4648]
Remove experimental Python RPC DirectRunner
URL: https://github.com/apache/beam/pull/5777
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/experimental/__init__.py
b/sdks/python/apache_beam/runners/experimental/__init__.py
deleted file mode 100644
index f4f43cbb123..00000000000
--- a/sdks/python/apache_beam/runners/experimental/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
diff --git
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
deleted file mode 100644
index 0b416aa8e52..00000000000
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/__init__.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""This is the experimental direct runner for testing the job api that
-sends a runner API proto over the API and then runs it on the other side.
-"""
-
-from __future__ import absolute_import
-
-from
apache_beam.runners.experimental.python_rpc_direct.python_rpc_direct_runner
import PythonRPCDirectRunner
diff --git
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
deleted file mode 100644
index 57056abf8b6..00000000000
---
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A runner implementation that submits a job for remote execution.
-"""
-
-from __future__ import absolute_import
-
-import logging
-import random
-import string
-from builtins import range
-
-import grpc
-
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.runners.job import utils as job_utils
-from apache_beam.runners.job.manager import DockerRPCManager
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-
-__all__ = ['PythonRPCDirectRunner']
-
-
-class PythonRPCDirectRunner(PipelineRunner):
- """Executes a single pipeline on the local machine inside a container."""
-
- # A list of PTransformOverride objects to be applied before running a
pipeline
- # using DirectRunner.
- # Currently this only works for overrides where the input and output types do
- # not change.
- # For internal SDK use only. This should not be updated by Beam pipeline
- # authors.
- _PTRANSFORM_OVERRIDES = []
-
- def __init__(self):
- self._cache = None
-
- def run(self, pipeline):
- """Remotely executes entire pipeline or parts reachable from node."""
-
- # Performing configured PTransform overrides.
- pipeline.replace_all(PythonRPCDirectRunner._PTRANSFORM_OVERRIDES)
-
- # Start the RPC co-process
- manager = DockerRPCManager()
-
- # Submit the job to the RPC co-process
- jobName = ('Job-' +
- ''.join(random.choice(string.ascii_uppercase) for _ in
range(6)))
- options = {k: v for k, v in pipeline._options.get_all_options().items()
- if v is not None}
-
- try:
- response = manager.service.run(beam_job_api_pb2.SubmitJobRequest(
- pipeline=pipeline.to_runner_api(),
- pipelineOptions=job_utils.dict_to_struct(options),
- jobName=jobName))
-
- logging.info('Submitted a job with id: %s', response.jobId)
-
- # Return the result object that references the manager instance
- result = PythonRPCDirectPipelineResult(response.jobId, manager)
- return result
- except grpc.RpcError:
- logging.error('Failed to run the job with name: %s', jobName)
- raise
-
-
-class PythonRPCDirectPipelineResult(PipelineResult):
- """Represents the state of a pipeline run on the Dataflow service."""
-
- def __init__(self, job_id, job_manager):
- self.job_id = job_id
- self.manager = job_manager
-
- @property
- def state(self):
- return self.manager.service.getState(
- beam_job_api_pb2.GetJobStateRequest(jobId=self.job_id))
-
- def wait_until_finish(self):
- messages_request = beam_job_api_pb2.JobMessagesRequest(jobId=self.job_id)
- for message in self.manager.service.getMessageStream(messages_request):
- if message.HasField('stateResponse'):
- logging.info(
- 'Current state of job: %s',
- beam_job_api_pb2.JobState.Enum.Name(
- message.stateResponse.state))
- else:
- logging.info('Message %s', message.messageResponse)
- logging.info('Job with id: %s in terminal state now.', self.job_id)
-
- def cancel(self):
- return self.manager.service.cancel(
- beam_job_api_pb2.CancelJobRequest(jobId=self.job_id))
-
- def metrics(self):
- raise NotImplementedError
diff --git
a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
deleted file mode 100644
index a0397c601a2..00000000000
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/server.py
+++ /dev/null
@@ -1,113 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A runner implementation that submits a job for remote execution.
-"""
-from __future__ import absolute_import
-
-import time
-import uuid
-from concurrent import futures
-
-import grpc
-
-from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.pipeline import Pipeline
-from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.portability.api import beam_job_api_pb2_grpc
-from apache_beam.runners.runner import PipelineState
-
-_ONE_DAY_IN_SECONDS = 60 * 60 * 24
-
-
-class JobService(beam_job_api_pb2_grpc.JobServiceServicer):
-
- def __init__(self):
- self.jobs = {}
-
- def run(self, request, context):
- job_id = uuid.uuid4().get_hex()
- pipeline_result = Pipeline.from_runner_api(
- request.pipeline,
- 'DirectRunner',
- PipelineOptions()).run()
- self.jobs[job_id] = pipeline_result
- return beam_job_api_pb2.SubmitJobResponse(jobId=job_id)
-
- def getState(self, request, context):
- pipeline_result = self.jobs[request.jobId]
- return beam_job_api_pb2.GetJobStateResponse(
- state=self._map_state_to_jobState(pipeline_result.state))
-
- def cancel(self, request, context):
- pipeline_result = self.jobs[request.jobId]
- pipeline_result.cancel()
- return beam_job_api_pb2.CancelJobResponse(
- state=self._map_state_to_jobState(pipeline_result.state))
-
- def getMessageStream(self, request, context):
- pipeline_result = self.jobs[request.jobId]
- pipeline_result.wait_until_finish()
- yield beam_job_api_pb2.JobMessagesResponse(
- stateResponse=beam_job_api_pb2.GetJobStateResponse(
- state=self._map_state_to_jobState(pipeline_result.state)))
-
- def getStateStream(self, request, context):
- context.set_details('Not Implemented for direct runner!')
- context.set_code(grpc.StatusCode.UNIMPLEMENTED)
- return
-
- @staticmethod
- def _map_state_to_jobState(state):
- if state == PipelineState.UNKNOWN:
- return beam_job_api_pb2.JobState.UNSPECIFIED
- elif state == PipelineState.STOPPED:
- return beam_job_api_pb2.JobState.STOPPED
- elif state == PipelineState.RUNNING:
- return beam_job_api_pb2.JobState.RUNNING
- elif state == PipelineState.DONE:
- return beam_job_api_pb2.JobState.DONE
- elif state == PipelineState.FAILED:
- return beam_job_api_pb2.JobState.FAILED
- elif state == PipelineState.CANCELLED:
- return beam_job_api_pb2.JobState.CANCELLED
- elif state == PipelineState.UPDATED:
- return beam_job_api_pb2.JobState.UPDATED
- elif state == PipelineState.DRAINING:
- return beam_job_api_pb2.JobState.DRAINING
- elif state == PipelineState.DRAINED:
- return beam_job_api_pb2.JobState.DRAINED
- else:
- raise ValueError('Unknown pipeline state')
-
-
-def serve():
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(JobService(), server)
-
- server.add_insecure_port('[::]:50051')
- server.start()
-
- try:
- while True:
- time.sleep(_ONE_DAY_IN_SECONDS)
- except KeyboardInterrupt:
- server.stop(0)
-
-
-if __name__ == '__main__':
- serve()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 122164)
Time Spent: 1h (was: 50m)
> Remove the unused Python RPC DirectRunner
> -----------------------------------------
>
> Key: BEAM-4648
> URL: https://issues.apache.org/jira/browse/BEAM-4648
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Charles Chen
> Assignee: Charles Chen
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> We should remove the unused Python RPC DirectRunner here:
> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/experimental
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)