[
https://issues.apache.org/jira/browse/SQOOP-1803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14345504#comment-14345504
]
Jarek Jarcec Cecho commented on SQOOP-1803:
-------------------------------------------
I was thinking about this one a bit myself. I have couple of thoughts of
getting data back from the execution engine and I'm wondering what others
thinks. Please don't hesitate and chime in if I missed any approach.
1) DistributedCache
In addition to [~gwenshap] comments about supportability (and/or debuggability)
of {{DistributedCache}}, to my best knowledge it can be only used to distribute
data from the launcher (Sqoop Server) to children mapreduce tasks. I do not
believe that it can be used to the other way around to get files or data from
individual tasks back to the launcher. Looking at the [latest
javadocs|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html]
it seems still valid as the documentation contains note about immutability of
the cache when the job is submitted:
{quote}
DistributedCache tracks modification timestamps of the cache files. Clearly the
cache files should not be modified by the application or externally while the
job is executing.
{quote}
*Summary:* I believe that this solution is disqualified for the retrieving data
back from execution engine.
2) Counters
[Counters|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/Counters.html]
are nice technique how to get insight into what is the mapreduce job doing.
Multiple mappers can be updating the counters in parallel and it's mapreduce
responsibility to ensure that counters from all children tasks are sum-upped
correctly. The limitation of this solution is that counters can be only
{{long}} based (e.g. no {{String}}, {{Date}}, ...). Also the counters are
cumulative in nature, so it might be a bit difficult to retrieve discrete
values - we would need to ensure that only certain mappers/reducers will update
given counters whereas others won't or we would need to figure out a way how to
decode one single value when multiple mapper/reducers will update one counter
at the same time.
*Summary:* Whereas it's not impossible to use the counters to retrieve data
from execution engine, it seems that this solution will impose limitations and
will be "difficult" to implement and maintain.
3) HDFS Files
Looking into how others are solving this problem,
[Oozie|http://oozie.apache.org] launcher tasks (=one map mapreduce jobs) are
generating files on HDFS in predefined directory from where the Oozie server
will pick them up to read any arbitrary values. This is neat solution as it
allows us to retrieve any value of any type from any part of the workflow (all
processes can create their own files if needed). The downside is that we would
need to agree on certain location where Server and the mapreduce job will be
exchanging files - this directory must exists and must be accessible by both
Sqoop (running under system user) and the mapreduce job itself (most likely
running as end user). I believe that HDFS ACLs can be easily used to accomplish
this task.
We would need to be careful here with edge conditions - we would need to make
sure that we're cleaning up old and unused files (job failures, ...) and that
we are not leaking any sensitive information to the HDFS.
*Summary:* Possible solution that will support all our use cases, but will be a
bit harder to implement.
4) Server side exchange only
I was also looking into how things work currently in the server and I've
realized something that made me thing about this proposal. Back when we were
defining the workflow, the intention was that only {{Initializer}} is allowed
to generate state whereas all other parts of the workflow {{Partitioner}},
{{Extractor}}, {{Loader}} and {{Destroyer}} should not generate any state and
only reuse the one that was pre-prepared in initializer. The reason for that is
that {{Initializer}} is run only once whereas all other parts of the workflow
are run in parallel and/or not running on Sqoop server itself, hence by
allowing state to be generated only in {{Initializer}} we don't have to deal
with synchronizing the parallel pieces or deal with limitations in various
execution engines. The intention is persisted in the API when {{Initializer}}
is given {{MutableContext}} where connector developer can set any properties
that will be shared with rest of the workflow (~ the state) and when all other
parts are given only {{ImmutableContext}} that doesn't allow any changes to the
shared properties. I have to say that we have small exception in the code base,
because {{Partitioner}} class is generating {{Partition}} objects that can
carry some context as well. However as the {{Partition}} objects are not
available in {{Destroyer}}, connector developer still needs to persist state
that is required through entire workflow inside the {{Initializer}}.
Having said that, another option seems to be to simply not retrieve anything
from the execution engine and let connector update the configuration objects
based on info that the connector generated in {{Initializer}} - assuming that
the job finished correctly. Looking at current connectors this should work
well, as we need to update and persist state that is 'locked' at the
{{Initializer}} stage. For database-base connectors the "max value" should be
determined in initializer (it's currently not though) and the same for Kafka
and other connectors. The beauty of this approach is that it's simple to
implement and can actually be easily extended in the future to include data
coming from execution engine shell there be a need for it (for the approach 3)
for example).
> JobManager and Execution Engine changes: Support for a injecting and pulling
> out configs and job output in connectors
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: SQOOP-1803
> URL: https://issues.apache.org/jira/browse/SQOOP-1803
> Project: Sqoop
> Issue Type: Sub-task
> Reporter: Veena Basavaraj
> Assignee: Veena Basavaraj
> Fix For: 1.99.6
>
>
> The details are in the design wiki, as the implementation happens more
> discussions can happen here.
> https://cwiki.apache.org/confluence/display/SQOOP/Delta+Fetch+And+Merge+Design#DeltaFetchAndMergeDesign-Howtogetoutputfromconnectortosqoop?
> The goal is to dynamically inject a IncrementalConfig instance into the
> FromJobConfiguration. The current MFromConfig and MToConfig can already hold
> a list of configs, and a strong sentiment was expressed to keep it as a list,
> why not for the first time actually make use of it and group the incremental
> related configs in one config object
> This task will prepare the FromJobConfiguration from the job config data,
> ExtractorContext with the relevant values from the prev job run
> This task will prepare the ToJobConfiguration from the job config data,
> LoaderContext with the relevant values from the prev job run if any
> We will use DistributedCache to get State information from the Extractor and
> Loader out and finally persist it into the sqoop repository depending on
> SQOOP-1804 once the outputcommitter commit is called
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)