http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html b/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html new file mode 100644 index 0000000..da6b75b --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobmanager/config.html @@ -0,0 +1,33 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<table class="table table-properties"> + <thead> + <tr> + <th>Key</th> + <th>Value</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="entry in jobmanager.config | orderBy: 'key'"> + <td>{{entry.key}}</td> + <td>{{entry.value}}</td> + </tr> + </tbody> +</table> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html b/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html new file mode 100644 index 0000000..02c2f47 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobmanager/index.html @@ -0,0 +1,33 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<nav class="navbar navbar-default navbar-fixed-top navbar-main"> + <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div> + <div class="navbar-title">Job Manager</div> +</nav> +<nav class="navbar navbar-default navbar-fixed-top navbar-main-additional"> + <ul class="nav nav-tabs"> + <li ui-sref-active="active"><a ui-sref=".config">Configuration</a></li> + <li ui-sref-active="active"><a ui-sref=".log">Logs</a></li> + <li ui-sref-active="active"><a ui-sref=".stdout">Stdout</a></li> + </ul> +</nav> +<div id="content-inner" class="has-navbar-main-additional"> + <div ui-view="details"></div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html b/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html new file mode 100644 index 0000000..df6a817 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobmanager/stdout.html @@ -0,0 +1,40 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +--> +<table class="table table-properties"> + <thead> + <tr> + <th colspan="2"> + <div class="row"> + <div class="col-xs-10">Job Manager Output</div> + <div class="col-xs-1 text-right"><a ng-click="reloadData()" class="show-pointer"><i class="fa fa-refresh"></i></a></div> + <div class="col-xs-1 text-left"><a href="jobmanager/stdout"><i class="fa fa-download"></i></a></div> + </div> + </th> + </tr> + </thead> + <tbody> + <tr> + <td colspan="2"> + <pre>{{jobmanager.stdout}}</pre> + </td> + </tr> + </tbody> +</table> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html b/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html new file mode 100644 index 0000000..b76278d --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/completed-jobs.html @@ -0,0 +1,53 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<nav class="navbar navbar-default navbar-fixed-top navbar-main"> + <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div> + <div class="navbar-title">Completed Jobs</div> +</nav> +<div id="content-inner"> + <table class="table table-hover table-clickable"> + <thead> + <tr> + <th>Start Time</th> + <th>End Time</th> + <th>Duration</th> + <th>Job Name</th> + <th>Job ID</th> + <th>Tasks</th> + <th>Status</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="job in jobs" ui-sref="single-job.plan.overview({ jobid: job.jid })"> + <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job.duration}} ms</td> + <td>{{job.name}}</td> + <td>{{job.jid}}</td> + <td class="label-group"> + <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label> + </td> + <td> + <bs-label status="{{job.state}}">{{job.state}}</bs-label> + </td> + </tr> + </tbody> + </table> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html new file mode 100644 index 0000000..a7a5d9d --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.config.html @@ -0,0 +1,57 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<table ng-if="job['execution-config']" class="table table-properties"> + <thead> + <tr> + <th colspan="2">Execution configuration</th> + </tr> + </thead> + <tbody> + <tr> + <td>Execution mode</td> + <td>{{ job['execution-config']['execution-mode'] }}</td> + </tr> + <tr> + <td>Max. number of execution retries</td> + <td>{{ job['execution-config']['max-execution-retries'] === -1 ? 'deactivated' : job['execution-config']['max-execution-retries'] }}</td> + </tr> + <tr> + <td>Job parallelism</td> + <td>{{ job['execution-config']['job-parallelism'] === -1 ? 'auto' : job['execution-config']['job-parallelism'] }}</td> + </tr> + <tr> + <td>Object reuse mode</td> + <td>{{ job['execution-config']['object-reuse-mode'] }}</td> + </tr> + </tbody> +</table> +<table ng-if="job['execution-config']['user-config']" class="table table-properties"> + <thead> + <tr> + <th colspan="2">User configuration</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="property in job['execution-config']['user-config']"> + <td>{{property.name}}</td> + <td table-property="table-property" value="property.value"></td> + </tr> + </tbody> +</table> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html new file mode 100644 index 0000000..a5f6676 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.exceptions.html @@ -0,0 +1,38 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<div ng-if="exceptions['root-exception']" class="panel panel-default panel-multi"> + <div class="panel-heading clearfix"> + <div class="panel-title">Root exception</div> + </div> + <div class="panel-body"> + <pre class="exception">{{ exceptions['root-exception'] }}</pre> + </div> +</div> +<div ng-repeat="exception in exceptions['all-exceptions']" class="panel panel-default panel-multi"> + <div class="panel-heading clearfix"> + <div class="panel-title">{{ exception.task }}</div> + </div> + <div class="panel-heading clearfix"> + <div class="panel-info thin last"><span>{{ exception.location }}</span></div> + </div> + <div class="panel-body"> + <pre class="exception">{{ exception.exception }}</pre> + </div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html new file mode 100644 index 0000000..9d3e171 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.html @@ -0,0 +1,48 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main"> + <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div> + <div class="navbar-title"> + <indicator-primary status="{{job.state}}"></indicator-primary>{{ job.name }} + </div> + <div class="navbar-info first last hidden-xs hidden-sm">{{ job.jid }}</div> + <div class="navbar-info first last"> + <div class="label-group"> + <bs-label status="{{status}}" ng-repeat="(status, value) in job['status-counts']">{{value}}</bs-label> + </div> + </div> + <div class="navbar-info first last hidden-xs hidden-sm">{{ job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}<span ng-if="job['end-time'] > -1"> + - + {{ job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></div> + <div ng-if="job.duration > -1" class="navbar-info last first">{{job.duration}} ms</div> +</nav> +<nav ng-if="job" class="navbar navbar-default navbar-fixed-top navbar-main-additional"> + <ul class="nav nav-tabs"> + <li ui-sref-active="active"><a ui-sref=".plan.overview">Plan</a></li> + <li ui-sref-active="active"><a ui-sref=".statistics">Job Accumulators / Statistics</a></li> + <li ui-sref-active="active"><a ui-sref=".timeline">Timeline</a></li> + <li ui-sref-active="active"><a ui-sref=".exceptions">Exceptions</a></li> + <li ui-sref-active="active"><a ui-sref=".properties">Properties</a></li> + <li ui-sref-active="active"><a ui-sref=".config">Configuration</a></li> + </ul> +</nav> +<div id="content-inner" class="has-navbar-main-additional"> + <div ui-view="details"></div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html new file mode 100644 index 0000000..f2c4143 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html @@ -0,0 +1,31 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<div class="canvas-wrapper"> + <div job-plan="job-plan" plan="plan" jobid="{{jobid}}" set-node="changeNode(nodeid)" class="main-canvas"></div> +</div> +<div ng-if="plan" class="panel panel-default panel-multi"> + <nav class="navbar navbar-default navbar-secondary-additional"> + <ul class="nav nav-tabs"> + <li ui-sref-active="active"><a ui-sref=".overview({nodeid: nodeid})">Overview</a></li> + <li ui-sref-active="active"><a ui-sref=".accumulators({nodeid: nodeid})">Accumulators</a></li> + </ul> + </nav> + <div ui-view="node-details" class="panel-body clean"></div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html new file mode 100644 index 0000000..8de3921 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.accumulators.html @@ -0,0 +1,40 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<table class="table table-body-hover table-clickable table-activable"> + <thead> + <tr> + <th>Name</th> + <th>Status</th> + </tr> + </thead> + <tbody ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="v.id == nodeid || changeNode(v.id)"> + <tr ng-if="v.type == 'regular'"> + <td>{{ v.name | humanizeText }}</td> + <td> + <bs-label status="{{v.status}}">{{v.status}}</bs-label> + </td> + </tr> + <tr ng-if="nodeid && v.id == nodeid"> + <td colspan="10"> + <div ng-include=" 'partials/jobs/job.plan.node.accumulators.html' "></div> + </td> + </tr> + </tbody> +</table> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html new file mode 100644 index 0000000..1706d3e --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.overview.html @@ -0,0 +1,60 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<table class="table table-body-hover table-clickable table-activable"> + <thead> + <tr> + <th>Start Time</th> + <th>End Time</th> + <th>Duration</th> + <th>Name</th> + <th>Bytes read</th> + <th>Records read</th> + <th>Bytes written</th> + <th>Records written</th> + <th>Tasks</th> + <th>Status</th> + </tr> + </thead> + <tbody ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)"> + <tr ng-if="v.type == 'regular'"> + <td><span ng-if="v['start-time'] > -1">{{ v['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td> + <td><span ng-if="v['end-time'] > -1">{{ v['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td> + <td><span ng-if="v.duration > -1">{{ v.duration }} ms</span></td> + <td class="td-long">{{ v.name | humanizeText }}</td> + <td>{{ v.metrics['read-bytes'] }}</td> + <td>{{ v.metrics['read-records'] }}</td> + <td>{{ v.metrics['write-bytes'] }}</td> + <td>{{ v.metrics['write-records'] }}</td> + <td> + <div class="label-group"> + <bs-label status="{{status}}" ng-repeat="(index, status) in stateList">{{v.tasks[status]}}</bs-label> + </div> + </td> + <td> + <bs-label status="{{v.status}}">{{v.status}}</bs-label> + </td> + </tr> + <tr ng-if="nodeid && v.id == nodeid"> + <td colspan="10"> + <div ng-include=" 'partials/jobs/job.plan.node.subtasks.html' "></div> + </td> + </tr> + </tbody> +</table> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html new file mode 100644 index 0000000..e7dcf2c --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.accumulators.html @@ -0,0 +1,68 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<div ng-if="accumulators.length == 0"> + <p><i>No accumulators</i></p> +</div> +<div ng-if="accumulators && accumulators.length > 0"> + <table class="table table-hover table-clickable table-activable table-inner"> + <thead> + <tr> + <th>Name</th> + <th>Type</th> + <th>Value</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="accumulator in accumulators"> + <td width="30%">{{ accumulator.name }}</td> + <td width="30%">{{ accumulator.type }}</td> + <td width="30%">{{ accumulator.value }}</td> + </tr> + </tbody> + </table> + <div ng-if="!nodeUnfolded"><a ng-click="toggleFold()" class="btn btn-default"> + Show subtasks + <i class="fa fa-chevron-down"></i></a><a ng-click="deactivateNode(); $event.stopPropagation()" title="Fold" class="btn btn-default pull-right"><i class="fa fa-chevron-up"></i></a></div> + <div ng-if="nodeUnfolded && subtaskAccumulators && subtaskAccumulators.length > 0"><a ng-click="toggleFold()" class="btn btn-default"> + Hide subtasks + <i class="fa fa-chevron-up"></i></a> + <table class="table table-hover table-clickable table-activable table-inner"> + <thead> + <tr> + <th>Name</th> + <th>Type</th> + <th>Value</th> + </tr> + </thead> + <tbody ng-if="subtask['user-accumulators'] && subtask['user-accumulators'].length > 0" ng-repeat="subtask in subtaskAccumulators"> + <tr> + <td colwidth="3"> + <div class="small-label">({{ subtask.subtask }}) {{ subtask.host }}, attempt: {{ subtask.attempt + 1 }}</div> + </td> + </tr> + <tr ng-repeat="accumulator in subtask['user-accumulators']"> + <td width="30%">{{ accumulator.name }}</td> + <td width="30%">{{ accumulator.type }}</td> + <td width="30%">{{ accumulator.value }}</td> + </tr> + </tbody> + </table> + </div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html new file mode 100644 index 0000000..40b16bc --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.subtasks.html @@ -0,0 +1,52 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + +--> +<table ng-if="subtasks" class="table table-hover table-clickable table-activable table-inner"> + <thead> + <tr> + <th>Start Time</th> + <th>End Time</th> + <th>Duration</th> + <th>Bytes read</th> + <th>Records read</th> + <th>Bytes written</th> + <th>Records written</th> + <th>Attempt</th> + <th>Host</th> + <th>Status</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="subtask in subtasks"> + <td><span ng-if="subtask['start-time'] > -1">{{ subtask['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td> + <td><span ng-if="subtask['end-time'] > -1">{{ subtask['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</span></td> + <td><span ng-if="subtask.duration > -1">{{ subtask.duration }} ms</span></td> + <td><span ng-if="subtask.metrics['read-bytes'] > -1">{{ subtask.metrics['read-bytes'] }}</span></td> + <td><span ng-if="subtask.metrics['read-records'] > -1">{{ subtask.metrics['read-records'] }}</span></td> + <td><span ng-if="subtask.metrics['write-bytes'] > -1">{{ subtask.metrics['write-bytes'] }}</span></td> + <td><span ng-if="subtask.metrics['write-records'] > -1">{{ subtask.metrics['write-records'] }}</span></td> + <td>{{ subtask.attempt + 1 }}</td> + <td>{{ subtask.host }}</td> + <td> + <bs-label status="{{subtask.status}}">{{subtask.status}}</bs-label> + </td> + </tr> + </tbody> +</table> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html new file mode 100644 index 0000000..907afd3 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.properties.html @@ -0,0 +1,140 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<div class="canvas-wrapper"> + <div job-plan="job-plan" plan="plan" jobid="{{jobid}}" set-node="changeNode(nodeid)" class="main-canvas"></div> +</div> +<div ng-if="node" class="panel panel-default"> + <div class="panel-heading clearfix"> + <div class="panel-title">{{ node.description | humanizeText }}</div> + </div> + <div class="panel-body clean"> + <div class="row"> + <div class="col-sm-6 col-md-4"> + <table ng-if="node.optimizer_properties.global_properties" class="table table-properties"> + <thead> + <tr> + <th colspan="2">Global Data Properties</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="property in node.optimizer_properties.global_properties"> + <td>{{property.name}}</td> + <td table-property="table-property" value="property.value"></td> + </tr> + </tbody> + </table> + <table ng-if="node.optimizer_properties.local_properties" class="table table-properties"> + <thead> + <tr> + <th colspan="2">Local Data Properties</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="property in node.optimizer_properties.local_properties"> + <td>{{property.name}}</td> + <td table-property="table-property" value="property.value"></td> + </tr> + </tbody> + </table> + <div class="visible-xs visible-sm"> + <table class="table table-properties"> + <thead> + <tr> + <th colspan="2">Properties</th> + </tr> + </thead> + <tbody> + <tr> + <td>Operator</td> + <td table-property="table-property" value="node.operator_strategy"></td> + </tr> + <tr> + <td>Parallelism</td> + <td table-property="table-property" value="node.parallelism"></td> + </tr> + </tbody> + </table> + </div> + </div> + <div class="hidden-sm col-md-4"> + <table class="table table-properties"> + <thead> + <tr> + <th colspan="2">Properties</th> + </tr> + </thead> + <tbody> + <tr> + <td>Operator</td> + <td table-property="table-property" value="node.operator_strategy"></td> + </tr> + <tr> + <td>Parallelism</td> + <td table-property="table-property" value="node.parallelism"></td> + </tr> + </tbody> + </table> + <table ng-if="node.optimizer_properties.estimates" class="table table-properties"> + <thead> + <tr> + <th colspan="2">Size Estimates</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="property in node.optimizer_properties.estimates"> + <td>{{property.name}}</td> + <td table-property="table-property" value="property.value"></td> + </tr> + </tbody> + </table> + </div> + <div class="col-sm-6 col-md-4"> + <div class="visible-xs visible-sm"> + <table ng-if="node.optimizer_properties.estimates" class="table table-properties"> + <thead> + <tr> + <th colspan="2">Size Estimates</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="property in node.optimizer_properties.estimates"> + <td>{{property.name}}</td> + <td table-property="table-property" value="property.value"></td> + </tr> + </tbody> + </table> + </div> + <table ng-if="node.optimizer_properties.costs" class="table table-properties"> + <thead> + <tr> + <th colspan="2">Cost Estimates</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="property in node.optimizer_properties.costs"> + <td>{{property.name}}</td> + <td table-property="table-property" value="property.value"></td> + </tr> + </tbody> + </table> + </div> + </div> + </div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html new file mode 100644 index 0000000..951cc1c --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.statistics.html @@ -0,0 +1,40 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<table class="table table-properties"> + <thead> + <tr> + <th colspan="2">Some statistics</th> + </tr> + </thead> + <tbody> + <tr> + <td>Operator</td> + <td>1</td> + </tr> + <tr> + <td>Parallelism</td> + <td>2</td> + </tr> + <tr> + <td>Subtasks-per-instance</td> + <td>3</td> + </tr> + </tbody> +</table> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html new file mode 100644 index 0000000..2f22576 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.html @@ -0,0 +1,23 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<div class="canvas-wrapper"> + <div timeline="timeline" vertices="vertices" jobid="jobid" class="timeline-canvas"></div> +</div> +<div ui-view="vertex"></div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html new file mode 100644 index 0000000..1a4bd06 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.timeline.vertex.html @@ -0,0 +1,30 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<div ng-if="vertex" class="panel panel-default panel-multi"> + <div class="panel-heading clearfix"> + <div class="panel-title">{{ vertex.groupvertex.groupvertexname | humanizeText }}</div> + </div> + <div class="panel-body"> + <div class="canvas-wrapper"> + <div vertex="vertex" data="vertex" class="timeline-canvas"></div> + </div> + <div id="timeline1"></div> + </div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html b/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html new file mode 100644 index 0000000..e175d07 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/running-jobs.html @@ -0,0 +1,53 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<nav class="navbar navbar-default navbar-fixed-top navbar-main"> + <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div> + <div class="navbar-title">Running Jobs</div> +</nav> +<div id="content-inner"> + <table class="table table-hover table-clickable"> + <thead> + <tr> + <th>Start Time</th> + <th>End Time</th> + <th>Duration</th> + <th>Job Name</th> + <th>Job ID</th> + <th>Tasks</th> + <th>Status</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="job in jobs" ui-sref="single-job.plan.overview({ jobid: job.jid })"> + <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job.duration}} ms</td> + <td>{{job.name}}</td> + <td>{{job.jid}}</td> + <td class="label-group"> + <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label> + </td> + <td> + <bs-label status="{{job.state}}">{{job.state}}</bs-label> + </td> + </tr> + </tbody> + </table> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/overview.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/overview.html b/flink-runtime-web/web-dashboard/web/partials/overview.html new file mode 100644 index 0000000..ec3c580 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/overview.html @@ -0,0 +1,147 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<nav class="navbar navbar-default navbar-fixed-top navbar-main"> + <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div> + <div class="navbar-title">Overview</div> +</nav> +<div id="content-inner"> + <div class="row"> + <div class="col-md-6"> + <div class="panel panel-default panel-dashboard"> + <div class="panel-heading"> + <div class="row"> + <div class="col-xs-3"><i class="fa fa-tasks fa-3x"></i></div> + <div class="col-xs-9 text-right"> + <div class="huge">{{overview.taskmanagers}}</div> + <div>Task Managers</div> + </div> + </div> + </div> + <div class="panel-heading"> + <div class="row"> + <div class="col-xs-3"><i class="fa fa-folder fa-3x"></i></div> + <div class="col-xs-9 text-right"> + <div class="huge">{{overview["slots-total"]}}</div> + <div>Task Slots</div> + </div> + </div> + </div> + <div class="panel-heading"> + <div class="row"> + <div class="col-xs-3"><i class="fa fa-folder-o fa-3x"></i></div> + <div class="col-xs-9 text-right"> + <div class="huge">{{overview["slots-available"]}}</div> + <div>Available Task Slots</div> + </div> + </div> + </div> + </div> + </div> + <div class="col-md-6"> + <div class="panel panel-default panel-lg"> + <div class="panel-heading">Total Jobs</div> + <div class="list-group"> + <div class="list-group-item"> + <div class="badge badge-primary">{{overview["jobs-running"]}}</div>Running + </div> + <div class="list-group-item"> + <div class="badge badge-success">{{overview["jobs-finished"]}}</div>Finished + </div> + <div class="list-group-item"> + <div class="badge badge-info">{{overview["jobs-cancelled"]}}</div>Canceled + </div> + <div class="list-group-item"> + <div class="badge badge-danger">{{overview["jobs-failed"]}}</div>Failed + </div> + </div> + </div> + </div> + </div> + <div class="panel panel-default"> + <div class="panel-heading"> + <h3 class="panel-title">Running Jobs</h3> + </div> + <div class="panel-body"> + <table class="table table-hover table-clickable"> + <thead> + <tr> + <th>Start Time</th> + <th>End Time</th> + <th>Duration</th> + <th>Job Name</th> + <th>Job ID</th> + <th>Tasks</th> + <th>Status</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="job in runningJobs" ui-sref="single-job.plan.overview({ jobid: job.jid })"> + <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job.duration}} ms</td> + <td>{{job.name}}</td> + <td>{{job.jid}}</td> + <td class="label-group"> + <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label> + </td> + <td> + <bs-label status="{{job.state}}">{{job.state}}</bs-label> + </td> + </tr> + </tbody> + </table> + </div> + </div> + <div class="panel panel-default"> + <div class="panel-heading"> + <h3 class="panel-title">Completed Jobs</h3> + </div> + <div class="panel-body"> + <table class="table table-hover table-clickable"> + <thead> + <tr> + <th>Start Time</th> + <th>End Time</th> + <th>Duration</th> + <th>Job Name</th> + <th>Job ID</th> + <th>Tasks</th> + <th>Status</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="job in finishedJobs" ui-sref="single-job.plan.overview({ jobid: job.jid })"> + <td>{{job['start-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job['end-time'] | amDateFormat:'YYYY-MM-DD, H:mm:ss'}}</td> + <td>{{job.duration}} ms</td> + <td>{{job.name}}</td> + <td>{{job.jid}}</td> + <td class="label-group"> + <bs-label status="{{status}}" ng-repeat="(status, value) in job.tasks">{{value}}</bs-label> + </td> + <td> + <bs-label status="{{job.state}}">{{job.state}}</bs-label> + </td> + </tr> + </tbody> + </table> + </div> + </div> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html new file mode 100644 index 0000000..bf37409 --- /dev/null +++ b/flink-runtime-web/web-dashboard/web/partials/taskmanager/index.html @@ -0,0 +1,57 @@ + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--> +<nav class="navbar navbar-default navbar-fixed-top navbar-main"> + <div id="fold-button" ng-click="showSidebar()" class="btn btn-default navbar-btn pull-left"><i class="fa fa-navicon"></i></div> + <div class="navbar-title">Task Managers</div> +</nav> +<div id="content-inner"> + <table class="table table-clickable table-hover"> + <thead> + <tr> + <th>Path, ID</th> + <th>Data Port</th> + <th>Last Heartbeat</th> + <th>All Slots</th> + <th>Free Slots</th> + <th>CPU Cores</th> + <th>Physical Memory</th> + <th>Free Memory</th> + <th>Flink Managed Memory</th> + </tr> + </thead> + <tbody> + <tr ng-repeat="manager in managers" ui-sref="single-manager.metrics({taskmanagerid: manager.id})"> + <td> + {{ manager.path }} + + <div class="small-label">{{ manager.id }}</div> + </td> + <td>{{ manager.dataPort }}</td> + <td>{{ manager.timeSinceLastHeartbeat | amDateFormat:'YYYY-MM-DD, H:mm:ss' }}</td> + <td>{{ manager.slotsNumber }}</td> + <td>{{ manager.freeSlots }}</td> + <td>{{ manager.cpuCores }}</td> + <td>{{ manager.physicalMemory | bytes:MB }}</td> + <td>{{ manager.freeMemory | bytes:MB }}</td> + <td>{{ manager.managedMemory | bytes:MB }}</td> + </tr> + </tbody> + </table> +</div> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java index 8a037ad..4351eb1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,40 +50,27 @@ class FileSystemBlobStore implements BlobStore { private final String basePath; FileSystemBlobStore(Configuration config) throws IOException { - StateBackend stateBackend = StateBackend.fromConfig(config); + String stateBackendBasePath = config.getString( + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); - if (stateBackend == StateBackend.FILESYSTEM) { - String stateBackendBasePath = config.getString( - ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""); - - if (stateBackendBasePath.equals("")) { - throw new IllegalConfigurationException(String.format("Missing configuration for " + - "file system state backend recovery path. Please specify via " + - "'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); - } + if (stateBackendBasePath.equals("")) { + throw new IllegalConfigurationException(String.format("Missing configuration for " + + "file system state backend recovery path. Please specify via " + + "'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + } - stateBackendBasePath += "/blob"; + stateBackendBasePath += "/blob"; - this.basePath = stateBackendBasePath; + this.basePath = stateBackendBasePath; - try { - FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath)); - } - catch (URISyntaxException e) { - throw new IOException(e); - } - - LOG.info("Created blob directory {}.", basePath); + try { + FileSystem.get(new URI(basePath)).mkdirs(new Path(basePath)); } - else { - // Nothing else support at the moment - throw new IllegalConfigurationException( - String.format("Illegal state backend " + - "configuration '%s'. Please configure '%s' as state " + - "backend and specify the recovery path via '%s' key.", - stateBackend, StateBackend.FILESYSTEM, - ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)); + catch (URISyntaxException e) { + throw new IOException(e); } + + LOG.info("Created blob directory {}.", basePath); } // - Put ------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 62ab440..cb2be64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +92,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto * @param client The Curator ZooKeeper client * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to * start with a '/') - * @param stateHandleProvider The state handle provider for checkpoints + * @param stateStorage State storage to be used to persist the completed + * checkpoint * @throws Exception */ public ZooKeeperCompletedCheckpointStore( @@ -100,16 +101,16 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto ClassLoader userClassLoader, CuratorFramework client, String checkpointsPath, - StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception { + StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception { checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); + checkNotNull(stateStorage, "State storage"); this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); checkNotNull(client, "Curator client"); checkNotNull(checkpointsPath, "Checkpoints path"); - checkNotNull(stateHandleProvider, "State handle provider"); // Ensure that the checkpoints path exists client.newNamespaceAwareEnsurePath(checkpointsPath) @@ -118,8 +119,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>( - this.client, stateHandleProvider); + this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage); this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index 660f8bc..a9ac77a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -26,7 +26,7 @@ import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -87,13 +87,21 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { /** Flag indicating whether this instance is running. */ private boolean isRunning; + /** + * Submitted job graph store backed by ZooKeeper + * + * @param client ZooKeeper client + * @param currentJobsPath ZooKeeper path for current job graphs + * @param stateStorage State storage used to persist the submitted jobs + * @throws Exception + */ public ZooKeeperSubmittedJobGraphStore( CuratorFramework client, String currentJobsPath, - StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception { + StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception { checkNotNull(currentJobsPath, "Current jobs path"); - checkNotNull(stateHandleProvider, "State handle provider"); + checkNotNull(stateStorage, "State storage"); // Keep a reference to the original client and not the namespace facade. The namespace // facade cannot be closed. @@ -104,11 +112,11 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { .ensure(client.getZookeeperClient()); // All operations will have the path as root - client = client.usingNamespace(client.getNamespace() + currentJobsPath); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + currentJobsPath); - this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); + this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(facade, stateStorage); - this.pathCache = new PathChildrenCache(client, "/", false); + this.pathCache = new PathChildrenCache(facade, "/", false); pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java new file mode 100644 index 0000000..12250b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for key/value state implementations that are backed by a regular heap hash map. The + * concrete implementations define how the state is checkpointed. + * + * @param <K> The type of the key. + * @param <V> The type of the value. + * @param <Backend> The type of the backend that snapshots this key/value state. + */ +public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Backend>> implements KvState<K, V, Backend> { + + /** Map containing the actual key/value pairs */ + private final HashMap<K, V> state; + + /** The serializer for the keys */ + private final TypeSerializer<K> keySerializer; + + /** The serializer for the values */ + private final TypeSerializer<V> valueSerializer; + + /** The value that is returned when no other value has been associated with a key, yet */ + private final V defaultValue; + + /** The current key, which the next value methods will refer to */ + private K currentKey; + + /** + * Creates a new empty key/value state. + * + * @param keySerializer The serializer for the keys. + * @param valueSerializer The serializer for the values. + * @param defaultValue The value that is returned when no other value has been associated with a key, yet. + */ + protected AbstractHeapKvState(TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, + V defaultValue) { + this(keySerializer, valueSerializer, defaultValue, new HashMap<K, V>()); + } + + /** + * Creates a new key/value state for the given hash map of key/value pairs. + * + * @param keySerializer The serializer for the keys. + * @param valueSerializer The serializer for the values. + * @param defaultValue The value that is returned when no other value has been associated with a key, yet. + * @param state The state map to use in this kev/value state. May contain initial state. + */ + protected AbstractHeapKvState(TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, + V defaultValue, + HashMap<K, V> state) { + this.state = requireNonNull(state); + this.keySerializer = requireNonNull(keySerializer); + this.valueSerializer = requireNonNull(valueSerializer); + this.defaultValue = defaultValue; + } + + // ------------------------------------------------------------------------ + + @Override + public V value() { + V value = state.get(currentKey); + return value != null ? value : defaultValue; + } + + @Override + public void update(V value) { + if (value != null) { + state.put(currentKey, value); + } + else { + state.remove(currentKey); + } + } + + @Override + public void setCurrentKey(K currentKey) { + this.currentKey = currentKey; + } + + @Override + public int size() { + return state.size(); + } + + @Override + public void dispose() { + state.clear(); + } + + /** + * Gets the serializer for the keys. + * @return The serializer for the keys. + */ + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + /** + * Gets the serializer for the values. + * @return The serializer for the values. + */ + public TypeSerializer<V> getValueSerializer() { + return valueSerializer; + } + + // ------------------------------------------------------------------------ + // checkpointing utilities + // ------------------------------------------------------------------------ + + protected void writeStateToOutputView(final DataOutputView out) throws IOException { + for (Map.Entry<K, V> entry : state.entrySet()) { + keySerializer.serialize(entry.getKey(), out); + valueSerializer.serialize(entry.getValue(), out); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java new file mode 100644 index 0000000..5cc16a7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.state.OperatorState; + +/** + * Key/Value state implementation for user-defined state. The state is backed by a state + * backend, which typically follows one of the following patterns: Either the state is stored + * in the key/value state object directly (meaning in the executing JVM) and snapshotted by the + * state backend into some store (during checkpoints), or the key/value state is in fact backed + * by an external key/value store as the state backend, and checkpoints merely record the + * metadata of what is considered part of the checkpoint. + * + * @param <K> The type of the key. + * @param <V> The type of the value. + */ +public interface KvState<K, V, Backend extends StateBackend<Backend>> extends OperatorState<V> { + + /** + * Sets the current key, which will be used to retrieve values for the next calls to + * {@link #value()} and {@link #update(Object)}. + * + * @param key The key. + */ + void setCurrentKey(K key); + + /** + * Creates a snapshot of this state. + * + * @param checkpointId The ID of the checkpoint for which the snapshot should be created. + * @param timestamp The timestamp of the checkpoint. + * @return A snapshot handle for this key/value state. + * + * @throws Exception Exceptions during snapshotting the state should be forwarded, so the system + * can react to failed snapshots. + */ + KvStateSnapshot<K, V, Backend> shapshot(long checkpointId, long timestamp) throws Exception; + + /** + * Gets the number of key/value pairs currently stored in the state. Note that is a key + * has been associated with "null", the key is removed from the state an will not + * be counted here. + * + * @return The number of key/value pairs currently stored in the state. + */ + int size(); + + /** + * Disposes the key/value state, releasing all occupied resources. + */ + void dispose(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java new file mode 100644 index 0000000..3d6c56c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * This class represents a snapshot of the {@link KvState}, taken for a checkpoint. Where exactly + * the snapshot stores the snapshot data (in this object, in an external data store, etc) depends + * on the actual implementation. This snapshot defines merely how to restore the state and + * how to discard the state. + * + * <p>One possible implementation is that this snapshot simply contains a copy of the key/value map. + * + * <p>Another possible implementation for this snapshot is that the key/value map is serialized into + * a file and this snapshot object contains a pointer to that file. + * + * @param <K> The type of the key + * @param <V> The type of the value + * @param <Backend> The type of the backend that can restore the state from this snapshot. + */ +public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> extends java.io.Serializable { + + /** + * Loads the key/value state back from this snapshot. + * + * + * @param stateBackend The state backend that created this snapshot and can restore the key/value state + * from this snapshot. + * @param keySerializer The serializer for the keys. + * @param valueSerializer The serializer for the values. + * @param defaultValue The value that is returned when no other value has been associated with a key, yet. + * @param classLoader The class loader for user-defined types. + * + * @return An instance of the key/value state loaded from this snapshot. + * + * @throws Exception Exceptions can occur during the state loading and are forwarded. + */ + KvState<K, V, Backend> restoreState( + Backend stateBackend, + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, + V defaultValue, + ClassLoader classLoader) throws Exception; + + + /** + * Discards the state snapshot, removing any resources occupied by it. + * + * @throws Exception Exceptions occurring during the state disposal should be forwarded. + */ + void discardState() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java new file mode 100644 index 0000000..16ad3fd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SerializedCheckpointData.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; + +/** + * This class represents serialized checkpoint data for a collection of elements. + */ +public class SerializedCheckpointData implements java.io.Serializable { + + private static final long serialVersionUID = -8783744683896503488L; + + /** ID of the checkpoint for which the IDs are stored */ + private final long checkpointId; + + /** The serialized elements */ + private final byte[] serializedData; + + /** The number of elements in the checkpoint */ + private final int numIds; + + /** + * Creates a SerializedCheckpointData object for the given serialized data. + * + * @param checkpointId The checkpointId of the checkpoint. + * @param serializedData The serialized IDs in this checkpoint. + * @param numIds The number of IDs in the checkpoint. + */ + public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) { + this.checkpointId = checkpointId; + this.serializedData = serializedData; + this.numIds = numIds; + } + + /** + * Gets the checkpointId of the checkpoint. + * @return The checkpointId of the checkpoint. + */ + public long getCheckpointId() { + return checkpointId; + } + + /** + * Gets the binary data for the serialized elements. + * @return The binary data for the serialized elements. + */ + public byte[] getSerializedData() { + return serializedData; + } + + /** + * Gets the number of IDs in the checkpoint. + * @return The number of IDs in the checkpoint. + */ + public int getNumIds() { + return numIds; + } + + // ------------------------------------------------------------------------ + // Serialize to Checkpoint + // ------------------------------------------------------------------------ + + /** + * Converts a list of checkpoints with elements into an array of SerializedCheckpointData. + * + * @param checkpoints The checkpoints to be converted into IdsCheckpointData. + * @param serializer The serializer to serialize the IDs. + * @param <T> The type of the ID. + * @return An array of serializable SerializedCheckpointData, one per entry in the + * + * @throws IOException Thrown, if the serialization fails. + */ + public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints, + TypeSerializer<T> serializer) throws IOException { + return fromDeque(checkpoints, serializer, new DataOutputSerializer(128)); + } + + /** + * Converts a list of checkpoints into an array of SerializedCheckpointData. + * + * @param checkpoints The checkpoints to be converted into IdsCheckpointData. + * @param serializer The serializer to serialize the IDs. + * @param outputBuffer The reusable serialization buffer. + * @param <T> The type of the ID. + * @return An array of serializable SerializedCheckpointData, one per entry in the + * + * @throws IOException Thrown, if the serialization fails. + */ + public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints, + TypeSerializer<T> serializer, + DataOutputSerializer outputBuffer) throws IOException { + SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()]; + + int pos = 0; + for (Tuple2<Long, List<T>> checkpoint : checkpoints) { + outputBuffer.clear(); + List<T> checkpointIds = checkpoint.f1; + + for (T id : checkpointIds) { + serializer.serialize(id, outputBuffer); + } + + serializedCheckpoints[pos++] = new SerializedCheckpointData( + checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size()); + } + + return serializedCheckpoints; + } + + // ------------------------------------------------------------------------ + // De-Serialize from Checkpoint + // ------------------------------------------------------------------------ + + /** + * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints. + * + * @param data The data to be deserialized. + * @param serializer The serializer used to deserialize the data. + * @param <T> The type of the elements. + * @return An ArrayDeque of element checkpoints. + * + * @throws IOException Thrown, if the serialization fails. + */ + public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque( + SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException + { + ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length); + DataInputDeserializer deser = null; + + for (SerializedCheckpointData checkpoint : data) { + byte[] serializedData = checkpoint.getSerializedData(); + if (deser == null) { + deser = new DataInputDeserializer(serializedData, 0, serializedData.length); + } + else { + deser.setBuffer(serializedData, 0, serializedData.length); + } + + final List<T> ids = new ArrayList<>(checkpoint.getNumIds()); + final int numIds = checkpoint.getNumIds(); + + for (int i = 0; i < numIds; i++) { + ids.add(serializer.deserialize(deser)); + } + + deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids)); + } + + return deque; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index 7aa1ccf..f8b1cfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -18,22 +18,196 @@ package org.apache.flink.runtime.state; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -public enum StateBackend { - JOBMANAGER, FILESYSTEM; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * A state backend defines how state is stored and snapshotted during checkpoints. + * + * @param <Backend> The type of backend itself. This generic parameter is used to refer to the + * type of backend when creating state backed by this backend. + */ +public abstract class StateBackend<Backend extends StateBackend<Backend>> implements java.io.Serializable { + + private static final long serialVersionUID = 4620413814639220247L; + + // ------------------------------------------------------------------------ + // initialization and cleanup + // ------------------------------------------------------------------------ + + /** + * This method is called by the task upon deployment to initialize the state backend for + * data for a specific job. + * + * @param job The ID of the job for which the state backend instance checkpoints data. + * @throws Exception Overwritten versions of this method may throw exceptions, in which + * case the job that uses the state backend is considered failed during + * deployment. + */ + public abstract void initializeForJob(JobID job) throws Exception; + + /** + * Disposes all state associated with the current job. + * + * @throws Exception Exceptions may occur during disposal of the state and should be forwarded. + */ + public abstract void disposeAllStateForCurrentJob() throws Exception; + + /** + * Closes the state backend, releasing all internal resources, but does not delete any persistent + * checkpoint data. + * + * @throws Exception Exceptions can be forwarded and will be logged by the system + */ + public abstract void close() throws Exception; + + // ------------------------------------------------------------------------ + // key/value state + // ------------------------------------------------------------------------ /** - * Returns the configured {@link StateBackend}. + * Creates a key/value state backed by this state backend. + * + * @param keySerializer The serializer for the key. + * @param valueSerializer The serializer for the value. + * @param defaultValue The value that is returned when no other value has been associated with a key, yet. + * @param <K> The type of the key. + * @param <V> The type of the value. + * + * @return A new key/value state backed by this backend. + * + * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. + */ + public abstract <K, V> KvState<K, V, Backend> createKvState( + TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, + V defaultValue) throws Exception; + + + // ------------------------------------------------------------------------ + // storing state for a checkpoint + // ------------------------------------------------------------------------ + + /** + * Creates an output stream that writes into the state of the given checkpoint. When the stream + * is closes, it returns a state handle that can retrieve the state back. + * + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @return An output stream that writes state for the given checkpoint. + * + * @throws Exception Exceptions may occur while creating the stream and should be forwarded. + */ + public abstract CheckpointStateOutputStream createCheckpointStateOutputStream( + long checkpointID, long timestamp) throws Exception; + + /** + * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint. + * When the stream is closes, it returns a state handle that can retrieve the state back. * - * @param config The config to parse - * @return Configured state backend or {@link ConfigConstants#DEFAULT_RECOVERY_MODE} if not - * configured. - */ - public static StateBackend fromConfig(Configuration config) { - return StateBackend.valueOf(config.getString( - ConfigConstants.STATE_BACKEND, - ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase()); + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @return An DataOutputView stream that writes state for the given checkpoint. + * + * @throws Exception Exceptions may occur while creating the stream and should be forwarded. + */ + public CheckpointStateOutputView createCheckpointStateOutputView( + long checkpointID, long timestamp) throws Exception { + return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp)); + } + + /** + * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back. + * + * @param state The state to be checkpointed. + * @param checkpointID The ID of the checkpoint. + * @param timestamp The timestamp of the checkpoint. + * @param <S> The type of the state. + * + * @return A state handle that can retrieve the checkpoined state. + * + * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded. + */ + public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable( + S state, long checkpointID, long timestamp) throws Exception; + + + // ------------------------------------------------------------------------ + // Checkpoint state output stream + // ------------------------------------------------------------------------ + + /** + * A dedicated output stream that produces a {@link StreamStateHandle} when closed. + */ + public static abstract class CheckpointStateOutputStream extends OutputStream { + + /** + * Closes the stream and gets a state handle that can create an input stream + * producing the data written to this stream. + * + * @return A state handle that can create an input stream producing the data written to this stream. + * @throws IOException Thrown, if the stream cannot be closed. + */ + public abstract StreamStateHandle closeAndGetHandle() throws IOException; + } + + /** + * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed. + */ + public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper { + + private final CheckpointStateOutputStream out; + + public CheckpointStateOutputView(CheckpointStateOutputStream out) { + super(out); + this.out = out; + } + + /** + * Closes the stream and gets a state handle that can create a DataInputView. + * producing the data written to this stream. + * + * @return A state handle that can create an input stream producing the data written to this stream. + * @throws IOException Thrown, if the stream cannot be closed. + */ + public StateHandle<DataInputView> closeAndGetHandle() throws IOException { + return new DataInputViewHandle(out.closeAndGetHandle()); + } + + @Override + public void close() throws IOException { + out.close(); + } + } + + /** + * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle. + */ + private static final class DataInputViewHandle implements StateHandle<DataInputView> { + + private static final long serialVersionUID = 2891559813513532079L; + + private final StreamStateHandle stream; + + private DataInputViewHandle(StreamStateHandle stream) { + this.stream = stream; + } + + @Override + public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception { + return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader)); + } + + @Override + public void discardState() throws Exception { + stream.discardState(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a6890b28/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java new file mode 100644 index 0000000..5b622eb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.configuration.Configuration; + +/** + * A factory to create a specific state backend. The state backend creation gets a Configuration + * object that can be used to read further config values. + * + * @param <T> The type of the state backend created. + */ +public interface StateBackendFactory<T extends StateBackend<T>> { + + /** + * Creates the state backend, optionally using the given configuration. + * + * @param config The Flink configuration (loaded by the TaskManager). + * @return The created state backend. + * + * @throws Exception Exceptions during instantiation can be forwarded. + */ + StateBackend<T> createFromConfig(Configuration config) throws Exception; +}
