Repository: incubator-hawq Updated Branches: refs/heads/master 5fae6af7e -> 1584bb264
HAWQ-1152. PXF endIteration function added to Bridge Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/1584bb26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/1584bb26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/1584bb26 Branch: refs/heads/master Commit: 1584bb2644fe573ae5a3f9f726ac78c3ae4f7933 Parents: 5fae6af Author: Shivram Mani <[email protected]> Authored: Tue Nov 15 12:11:07 2016 -0800 Committer: Shivram Mani <[email protected]> Committed: Tue Nov 15 12:11:07 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/hawq/pxf/service/Bridge.java | 2 ++ .../java/org/apache/hawq/pxf/service/ReadBridge.java | 15 ++++++++++++--- .../apache/hawq/pxf/service/ReadSamplingBridge.java | 5 +++++ .../org/apache/hawq/pxf/service/WriteBridge.java | 8 ++++---- .../apache/hawq/pxf/service/rest/BridgeResource.java | 5 +++++ .../hawq/pxf/service/rest/WritableResource.java | 6 ++++++ 6 files changed, 34 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java index bfd862a..7160c7c 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/Bridge.java @@ -37,4 +37,6 @@ public interface Bridge { boolean setNext(DataInputStream inputStream) throws Exception; boolean isThreadSafe(); + + void endIteration() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java index 01a95ab..edd0a99 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadBridge.java @@ -90,7 +90,6 @@ public class ReadBridge implements Bridge { while (outputQueue.isEmpty()) { onerow = fileAccessor.readNextObject(); if (onerow == null) { - fileAccessor.closeForRead(); output = outputBuilder.getPartialLine(); if (output != null) { LOG.warn("A partial record in the end of the fragment"); @@ -110,7 +109,6 @@ public class ReadBridge implements Bridge { } } catch (IOException ex) { if (!isDataException(ex)) { - fileAccessor.closeForRead(); throw ex; } output = outputBuilder.getErrorOutput(ex); @@ -127,13 +125,24 @@ public class ReadBridge implements Bridge { } output = outputBuilder.getErrorOutput(ex); } catch (Exception ex) { - fileAccessor.closeForRead(); throw ex; } return output; } + /** + * Close the underlying resource + */ + public void endIteration() throws Exception { + try { + fileAccessor.closeForRead(); + } catch (Exception e) { + LOG.error("Failed to close bridge resources: " + e.getMessage()); + throw e; + } + } + public static ReadAccessor getFileAccessor(InputData inputData) throws Exception { return (ReadAccessor) Utilities.createAnyInstance(InputData.class, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java index d5ae66a..77c658d 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/ReadSamplingBridge.java @@ -125,6 +125,11 @@ public class ReadSamplingBridge implements Bridge { } @Override + public void endIteration() throws Exception { + bridge.endIteration(); + } + + @Override public boolean isThreadSafe() { return bridge.isThreadSafe(); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java index c3ee731..fe3d274 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/WriteBridge.java @@ -72,23 +72,23 @@ public class WriteBridge implements Bridge { List<OneField> record = inputBuilder.makeInput(inputStream); if (record == null) { - close(); return false; } OneRow onerow = fieldsResolver.setFields(record); if (onerow == null) { - close(); return false; } if (!fileAccessor.writeNextObject(onerow)) { - close(); throw new BadRecordException(); } return true; } - private void close() throws Exception { + /* + * Close the underlying resource + */ + public void endIteration() throws Exception { try { fileAccessor.closeForWrite(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java index 3a062c3..104f353 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java @@ -155,6 +155,11 @@ public class BridgeResource extends RestResource { LOG.debug("Stopped streaming fragment " + fragment + " of resource " + dataDir + ", " + recordCount + " records."); + try { + bridge.endIteration(); + } catch (Exception e) { + // ignore ... any significant errors should already have been handled + } if (!threadSafe) { unlock(dataDir); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1584bb26/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java index a6c8d6b..bc45f1a 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/WritableResource.java @@ -163,6 +163,12 @@ public class WritableResource extends RestResource{ } catch (Exception ex) { LOG.debug("totalWritten so far " + totalWritten + " to " + path); throw ex; + } finally { + try { + bridge.endIteration(); + } catch (Exception e) { + // ignore ... any significant errors should already have been handled + } } String censuredPath = Utilities.maskNonPrintables(path);
